In [1]:
!pip install folium
!pip install boto3
!pip install censusgeocode

import numpy as np
import pandas as pd
import folium
from folium.plugins import MarkerCluster
import boto3
import pyspark as ps# for the pyspark suite
import census_tract as a
import api2 as ap




spark = ps.sql.SparkSession.builder \
            .master("local[4]") \
            .appName("case study") \
            .getOrCreate()

sc = spark.sparkContext

#Instantiating the boto resource and client for downloading/uploading files
s3_connection = boto3.resource('s3')
s3_client = boto3.client('s3')



In [2]:
## The Pipeline 



def load_csv_from_s3(bucketname, filename):
    """
    Input:
        bucketname (str): Name of bucket that file is stored in
        filename (str): Name of csv within bucket (ex: "cool_data.csv")
        
    Output:
        pandas dataframe of csv (assuming no read_csv arguments are needed)
    """
    
    boto_object = s3_client.get_object(Bucket=bucketname, Key=filename)
    return pd.read_csv(boto_object['Body'])


def site_selector(site, df):
    """
    Input:
        site (str): name of site selected 
        df (dataframe): pandas dataframe of all all sites to be seleceted from 
        
    Output:
        pandas dataframe characetrizing the site w/ columns: CustZIP, CustState, CustLat, CustLong, ResSize
    """
    result = df[df['Park'] == site]
    result_group = result.groupby(['CustZIP', 'CustState', 'CustLat', 'CustLong'])
    result_zips = result_group.count().reset_index(drop = False)
    result_zips = result_zips.drop(['Location', 'Park', 'SiteType', 'UseType', 'FacState', 'FacLong', 'FacLat', 'CustCountry', 'CustSize', 'Dist'], axis=1)
    result_zips = result_zips.rename(columns={'Res_ID':'Res_Size'})
    return result_zips

def site_customer_mapper(df, popup):
    """
    Input: 
        df (dataframe): pandas dataframe characterizing the site with CustLat, CustLong, Res_Size columns
        popup (str): name of row to be used in popup message for each marker
    Output:
        folium map centered on cont. USA showing each customer location for a given site
    """
    
    map_osm = folium.Map(location=[39.8283, -98.5795], zoom_start=4)
    marker_cluster = MarkerCluster().add_to(map_osm)
    df.apply(lambda row:folium.Marker(location=[row["CustLat"], row["CustLong"]], 
                                              radius=(row["Res_Size"]/df['Res_Size'].max())*10, fill_color='blue', popup = row[popup])
                                             .add_to(marker_cluster), axis=1)
    return map_osm


def add_census_data(df):
    """
    Input: 
        df (dataframe): pandas dataframe characterizing the site w/ columns: CustZIP, CustState, CustLat, CustLong, ResSize
    Output:
        numpy array of input dataframe with added census tract, state number, and county number
    """
    nparr = df.to_numpy()
    rdd = sc.parallelize(nparr)\
        .map(lambda row: row.tolist())\
        .map(lambda row: ap.add_census(row))
    census_data = rdd.collect()
    census_data = list(filter(None.__ne__, census_data))
    census_df = pd.DataFrame.from_records(census_data)
    census_arr = census_df.to_numpy()
    return census_arr



def rdd_to_data(census_data, cluster):
    """
    Input: 
        numpy array of a site's customer information including census tract, state number, and county number
    Output:
        array of a site's customer information with added census data according to the variable cluster for each customer
    """
    rdd = sc.parallelize(census_data)\
        .map(lambda row: row.tolist())\
        .map(lambda row: ap.add_census_vars(row, cluster))
    return rdd.collect()


def arr_to_pandas(arr, cluster):
    """
    Input: 
        array of a site's customer information including census data for a given cluster
    Output:
        pandas dataframe  of a site's customer information with additional columns for each cluster variable
    """
    result = pd.DataFrame(arr)
    result = result.rename(columns={0: "ZIP", 1: "State", 2: "Lat", 3: "Lng", 4: "Size", 5: "Tract", 6: "State_code", 7: "County"})
    result = result.rename(columns={i: cluster[i-8][1] for i in range(8, 8+len(cluster))})
    return result


def export(df, fname, bucket):
    """
    Input: 
        pandas dataframe  of a site's customer information with additional columns for each cluster variable
    Output:
        No output. The data is exported to s3 and csv
    """
    to_export = fname
    df.to_csv(to_export, header=True, index=True)
    s3_client.upload_file(to_export, bucket, to_export)

In [3]:
bucket = 'capstone.1'

co_sites_df = load_csv_from_s3(bucket, 'co_sites.csv')

In [4]:
ohaver_zips = site_selector('OHaver Lake --- FPIN46', co_sites_df)

In [5]:
site_customer_mapper(ohaver_zips, 'Res_Size')

In [6]:
## Census Variable Clusters

econ_df = load_csv_from_s3(bucket, 'econ_var_names.csv')
social_df = load_csv_from_s3(bucket, 'social_var_names.csv')
demo_df = load_csv_from_s3(bucket, 'demo_var_names.csv')
econ_var_names = econ_df.to_numpy()
social_var_names = social_df.to_numpy()
demo_var_names = demo_df.to_numpy()

econ_clusters = {'Industry': (31, 43), 'Commute': (17, 22), 'Income_Benefits': (50, 59), 'Health_Insurance': (94, 96)}
industry = ap.cluster_variables(econ_var_names, econ_clusters['Industry'])
commute = ap.cluster_variables(econ_var_names, econ_clusters['Commute'])
income_benefits = ap.cluster_variables(econ_var_names, econ_clusters['Income_Benefits'])
health = ap.cluster_variables(econ_var_names, econ_clusters['Health_Insurance'])

social_clusters = {'Internet': (149, 150), 'Language': (109, 119), 'Education': (57, 65), 'Veteran_Status': (67, 67)}
internet = ap.cluster_variables(social_var_names, social_clusters['Internet'])
language = ap.cluster_variables(social_var_names, social_clusters['Language'])
education = ap.cluster_variables(social_var_names, social_clusters['Education'])
vet_status = ap.cluster_variables(social_var_names, social_clusters['Veteran_Status'])

demo_clusters = {'Age': (3, 15), 'Gender': (0, 1), 'Race': (35, 54), 'Latino': (71, 74)}
age = ap.cluster_variables(demo_var_names, demo_clusters['Age'])
gender = ap.cluster_variables(demo_var_names, demo_clusters['Gender'])
race = ap.cluster_variables(demo_var_names, demo_clusters['Race'])

In [7]:
ohaver_census = add_census_data(ohaver_zips)

In [40]:
ohaver_health = rdd_to_data(ohaver_census, health)

In [46]:
ohaver_health_df = arr_to_pandas(ohaver_health, health)
export(ohaver_health_df, 'ohaver_health_data.csv', bucket)

In [47]:
ohaver_age = rdd_to_data(ohaver_census, age)

In [51]:
ohaver_age_df = arr_to_pandas(ohaver_age, age)
export(ohaver_age_df, 'ohaver_age_data.csv', bucket)

In [8]:
ohaver_gender = rdd_to_data(ohaver_census, gender)

In [9]:
ohaver_gender_df = arr_to_pandas(ohaver_gender, gender)
export(ohaver_gender_df, 'ohaver_gender_data.csv', bucket)

In [10]:
ohaver_vets = rdd_to_data(ohaver_census, vet_status)

In [11]:
ohaver_vets_df = arr_to_pandas(ohaver_vets, vet_status)
export(ohaver_vets_df, 'ohaver_vets_data.csv', bucket)