In [1]:
import pandas as pd
import configparser
import os


from pyspark.sql import SparkSession
from pyspark import SparkContext
from datetime import datetime, timedelta
from pyspark.sql import types as T, functions as F
from pyspark.sql.functions import *

In [2]:
config = configparser.ConfigParser()
config.read("dl.cfg")

KEY = config.get('AWS', 'AWS_ACCESS_KEY_ID')
SECRET = config.get('AWS', 'AWS_SECRET_ACCESS_KEY')

os.environ['AWS_ACCESS_KEY_ID'] = config.get('AWS',"AWS_ACCESS_KEY_ID")
os.environ['AWS_SECRET_ACCESS_KEY'] = config.get('AWS',"AWS_SECRET_ACCESS_KEY")
os.environ['HADOOP_HOME'] = "C:/winutils-master/winutils-master/hadoop-2.7.1/bin/"
#System.setProperty("hadoop.home.dir", "C:\Hadoop")

In [3]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.1") \
        .config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.awsAccessKeyId", os.environ['AWS_ACCESS_KEY_ID']) \
        .config("spark.hadoop.fs.s3a.awsSecretAccessKey", os.environ['AWS_SECRET_ACCESS_KEY']) \
        .getOrCreate()

In [4]:
def parse_state(x):
    return x.strip().split('-')[-1]
udf_parse_state = F.udf(lambda x: parse_state(x), T.StringType())

In [41]:
demo = spark.read.format('csv').load("s3a://capstonekndend/us-cities-demographics.csv", header = True, inferSchema = True, sep = ';') \
                 .select("State Code", "City") \
                 .withColumnRenamed("State Code", "state_code") \
                 .withColumnRenamed("City", "city")

In [42]:
us_airport = spark.read.format('csv').load("s3a://capstonekndend/airport_codes.csv", header = True, inferSchema = True) \
                       .filter("iso_country == 'US' ") \
                       .withColumn("state", udf_parse_state("iso_region")) \
                       .selectExpr("state AS state_code", "municipality AS city")

In [43]:
city = us_airport.union(demo) \
                 .drop_duplicates() \
                 .withColumn("city_id", F.monotonically_increasing_id())

In [44]:
city.write.mode("overwrite").options(header = True, inferSchema = True).csv("s3a://capstonekndend/csv/lake/city/")

In [10]:
def parse_lat(x):
    y = x.strip().split(',')
    return float(y[0])
udf_parse_lat = F.udf(lambda x: parse_lat(x), T.FloatType())

def parse_long(x):
    y = x.strip().split(',')
    return float(y[1])
udf_parse_long = F.udf(lambda x: parse_long(x), T.FloatType())

def parse_state(x):
    return x.strip().split('-')[-1]
udf_parse_state = F.udf(lambda x: parse_state(x), T.StringType())

In [13]:
with open('C:/Users/chait/Desktop/Udacity/I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()
    f_content = f_content.replace('\t', '')

def code_mapper(file, idx):
    f_content2 = f_content[f_content.index(idx):]
    f_content2 = f_content2[:f_content2.index(';')].split('\n')
    f_content2 = [i.replace("'", "") for i in f_content2]
    dic = [i.split('=') for i in f_content2[1:]]
    dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)
    return dic

i94cit_res = code_mapper(f_content, "i94cntyl")
i94port = code_mapper(f_content, "i94prtl")
i94mode = code_mapper(f_content, "i94model")
i94addr = code_mapper(f_content, "i94addrl")
i94visa = {'1':'Business', '2':'Pleasure', '3':'Student'}

In [46]:
df_state = list(map(list, i94addr.items()))
df_state= spark.createDataFrame(df_state, ["state_code", "state"])

df_state.write.mode("overwrite").options(header = True, inferSchema = True).csv("s3a://capstonekndend/csv/lake/codes/state_code/")

In [47]:
df_country = list(map(list, i94cit_res.items()))
df_country = spark.createDataFrame(df_country, ["country_code", "country"])

df_country.write.mode("overwrite").options(header = True, inferSchema = True).csv("s3a://capstonekndend/csv/lake/codes/country_code/")

In [88]:
us_airport = spark.read.format('csv').load("s3a://capstonekndend/airport_codes.csv", header = True, inferSchema = True) \
                      .withColumn("airport_latitude", udf_parse_lat("coordinates")) \
                      .withColumn("airport_longitude", udf_parse_long("coordinates")) \
                      .filter("iso_country == 'US' ") \
                      .withColumn("state", udf_parse_state("iso_region")) \
                      .withColumnRenamed("ident", "icao_code") \
                      .drop("coordinates", "gps_code", "local_code", "continent", "iso_region", "iso_country", "iata_code")

In [89]:
us_airport = us_airport.join(city, (us_airport.municipality == city.city) & (us_airport.state == city.state_code), "left").drop("municipality", "state", "city", "state_code")

In [90]:
us_airport.write.mode("overwrite").options(header = True, inferSchema = True).csv("s3a://capstonekndend/csv/lake/codes/airport_code/")

In [52]:
demo = spark.read.format('csv').load('s3a://capstonekndend/us-cities-demographics.csv', header = True, inferSchema = True, delimiter = ';') \
                 .withColumn("male_population", col("Male Population").cast(T.LongType())) \
                 .withColumn("female_population", col("Female Population").cast(T.LongType())) \
                 .withColumn("total_population", col("Total Population").cast(T.LongType())) \
                 .withColumn("num_veterans", col("Number of Veterans").cast(T.LongType())) \
                 .withColumn("foreign_born", col("Foreign-born").cast(T.LongType())) \
                 .withColumnRenamed("State Code", "state_code") \
                 .withColumnRenamed("Race", "race") \
                 .withColumnRenamed("Median Age", "median_age") \
                 .withColumnRenamed("City", "city") \
                 .withColumnRenamed("Average Household Size", "average_household_size") \
                 .drop("State", "Count", "Male Population", "Female Population", "Total Population", "Number of Veterans", "Foreign-born")

In [53]:
demo = demo.join(city, (demo.city == city.city) & (demo.state_code == city.state_code)).drop("city", "state_code")

In [54]:
demo.write.mode("overwrite").options(header = True, inferSchema = True).csv("s3a://capstonekndend/csv/lake/us_cities_demographics/")

In [55]:
def convert_latitude(x):
    direction = str(x)[-1]
    if direction == 'N':
        return float(str(x))[:-1]
    else:
        return -float(str(x))[:-1]
udf_convert_latitude = F.udf(lambda x: convert_latitude(x), T.FloatType())


def convert_longitude(x):
    direction = str(x)[-1]
    if direction == 'E':
        return float(str(x))[:-1]
    else:
        return -float(str(x))[:-1]
udf_convert_longitude = F.udf(lambda x: convert_longitude(x), T.FloatType())

In [56]:
thres = F.to_date(F.lit("2013-08-01")).cast(T.TimestampType())
us_weather = spark.read.format('csv').load("s3a://capstonekndend/GlobalLandTemperaturesByCity.csv", header = True, inferSchema = True) \
                       .where(col("dt") > thres) \
                       .withColumn("latitude", udf_convert_latitude("Latitude")) \
                       .withColumn("longitude", udf_convert_longitude("Longitude")) \
                       .withColumnRenamed("AverageTemperature", "avg_temp") \
                       .withColumnRenamed("AverageTemperatureUncertainty", "std_temp") \
                       .withColumnRenamed("City", "city") \
                       .withColumnRenamed("Country", "country") \
                       .where(col("avg_temp").isNotNull()) \
                       .filter("country = 'United States' ") \
                       .drop("dt", "country", "latitude", "Longitude")

In [57]:
us_weather = us_weather.join(city, "city", "left").drop("city", "state_code", "city_latitude", "city_longitude")

In [58]:
us_weather.write.mode("overwrite").options(header = True, inferSchema = True).csv("s3a://capstonekndend/csv/lake/us_cities_temperatures/")

In [95]:
airport_weather = us_airport.select("name", "elevation_ft", "city_id") \
                            .join(city.select("city", "city_id", "state_code"), "city_id", "left")

In [96]:
airport_weather = airport_weather.join(df_state, (airport_weather.state_code == df_state.state_code), "left").drop("state_code")

In [97]:
airport_weather = airport_weather.join(us_weather, "city_id", "inner").drop("city_id").drop_duplicates(subset = ['name'])

In [99]:
airport_weather.write.mode("overwrite").options(header = True, inferSchema = True).csv("s3a://capstonekndend/csv/lake/us_airports_weather/")

In [73]:
def to_datetime(x):
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days = int(x))
    except:
        return None
udf_to_datetime_sas = F.udf(lambda x: to_datetime(x), T.DateType())



def to_datetimefrstr(x):
    try:
        return datetime.strptime(x, '%m%d%Y')
    except:
        return None
udf_to_datetimefrstr = F.udf(lambda x: to_datetimefrstr(x), T.DateType())

In [74]:
immigrant = spark.read.parquet("s3a://capstonekndend/sas_data/") \
                      .selectExpr('cast(cicid as int) AS cicid', 'arrdate', 'cast(i94res as int) AS from_country_code', 'cast(i94bir as int) AS age',
                                  'cast(i94visa as int) AS visa_code', 'visapost AS visa_post', 'occup AS occupation',
                                  'visatype AS visa_type', 'cast(biryear as int) AS birth_year', 'gender') \
                      .withColumn("arrival_date", udf_to_datetime_sas("arrdate")) \
                      .withColumn("arrival_month", F.month("arrival_date")) \
                      .withColumn("arrival_year", F.year("arrival_date")) \
                      .withColumn("month", F.month("arrival_date")) \
                      .withColumn("year", F.year("arrival_date")) \
                      .drop("arrdate")

immigrant.write.mode("append").partitionBy("year", "month").options(header = True, inferSchema = True).csv("s3a://capstonekndend/csv/lake/immigrant/")

In [75]:
immigration = spark.read.parquet("s3a://capstonekndend/sas_data/") \
                      .selectExpr('cast(cicid as int) cicid', 'arrdate', 'i94port AS iata_code', 'i94addr AS state_code', 'depdate', 'dtaddto', 'airline', 'cast(admnum as long) admnum', 'fltno', 'entdepa', 'entdepd', 'entdepu', 'matflag') \
                      .withColumnRenamed("entdepa", "arrival_flag") \
                      .withColumnRenamed("entdepd", "departure_flag") \
                      .withColumnRenamed("entdepu", "update_flag") \
                      .withColumn("arrival_date", udf_to_datetime_sas("arrdate")) \
                      .withColumn("departure_date", udf_to_datetime_sas("depdate")) \
                      .withColumn("allowed_until", udf_to_datetimefrstr("dtaddto")) \
                      .withColumn("arrival_month", F.month("arrival_date")) \
                      .withColumn("arrival_year", F.year("arrival_date")) \
                      .withColumn("month", F.month("arrival_date")) \
                      .withColumn("year", F.year("arrival_date")) \
                      .drop("arrdate", "depdate", "dtaddto", "matflag")

immigration.write.mode("append").partitionBy("year", "month").options(header = True, inferSchema = True).csv("s3a://capstonekndend/csv/lake/immigration/")

In [101]:
demo = spark.read.options(header = True, inferSchema = True).csv("s3a://capstonekndend/csv/lake/us_cities_demographics/") \
                 .select("median_age", "city_id", "total_population", "foreign_born") \
                 .join(city.select("state_code", "city_id"), "city_id") \
                 .drop("city_id") \
                 .groupBy("state_code").agg(F.mean("median_age").alias("median_age"),
                                            F.sum("total_population").alias("total_population"),
                                            F.sum("foreign_born").alias("foreign_born"))

In [102]:
immigration_demographic = immigrant.select("cicid", "from_country_code", "age", "gender", "occupation", "arrival_date", "arrival_month", "arrival_year")

In [103]:
immigration_demographic = immigration_demographic.join(df_country, (immigration_demographic.from_country_code == df_country.country_code), "left") \
                                                 .withColumn("month", F.month("arrival_date")) \
                                                 .withColumn("year", F.year("arrival_date")) \
                                                 .withColumnRenamed("country", "from_country") \
                                                 .drop("from_country_code", "country_code", "arrival_date")

In [104]:
immigration_demographic = immigration_demographic.join(immigration.select("cicid", "state_code"), "cicid", "left")

In [105]:
immigration_demographic = immigration_demographic.join(df_state, (immigration_demographic.state_code == df_state.state_code),  "left")

In [106]:
immigration_demographic = immigration_demographic.join(demo, "state_code").drop("state_code")

In [108]:
immigration_demographic.write.partitionBy("year", "month").mode("append").options(header = True, inferSchema = True).csv("s3a://capstonekndend/csv/lake/immigration_demographic/")