In [1]:
import os, re
import configparser
import pandas as pd
from datetime import timedelta, datetime
from pyspark.sql import SparkSession, SQLContext, GroupedData
from pyspark.sql.types import StructField, StructType, IntegerType, DoubleType
from pyspark.sql.functions import *
from pyspark.sql.functions import date_add as d_add
from pyspark.sql.types import DoubleType
INPUT_FORMAT = 'csv'

def create_spark_session():
    """
    This function creates a session with Spark, the entry point to programming Spark with the Dataset and DataFrame API.
    """
    os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
    os.environ["PATH"] = "/opt/conda/bin:/opt/spark-2.4.3-bin-hadoop2.7/bin:/opt/conda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/lib/jvm/java-8-openjdk-amd64/bin"
    os.environ["SPARK_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
    os.environ["HADOOP_HOME"] = "/opt/spark-2.4.3-bin-hadoop2.7"
    spark = SparkSession.builder.getOrCreate()
    return spark

spark = create_spark_session()

In [3]:
#Load immigration data and rename default columns
immigration = spark.read.load('immigration_data_sample.csv', format=INPUT_FORMAT, header=True).select('*')\
    .withColumnRenamed('cicid','ID')\
    .withColumnRenamed('i94yr','year')\
    .withColumnRenamed('i94mon','month')\
    .withColumnRenamed('i94port','port_of_admission')\
    .withColumnRenamed('i94mode','mode_of_trans')\
    .withColumnRenamed('i94addr','arrival_state')\
    .withColumnRenamed('i94visa','visa_code')\
    .withColumnRenamed('biryear','birth_year')\
    .withColumnRenamed('i94bir','birth_city')
 
#Drop not useful columns
not_useful = ["count", "entdepa", "entdepd", "matflag", "fltno", "dtaddto", "admnum", "i94bir", "dtadfile", "visapost", "occup", "entdepu", "insnum"]
immigration = immigration.drop(*not_useful)

# drop null records from i94addr column
immigration = immigration.na.drop(subset=["arrival_state"])
#Change I94MODE code to string
transport_lookup = spark.read.load("lookup/I94MODE.csv", format="csv", columns="*", header=True).select('*').withColumnRenamed('ID', 'ID_TMP')
immigration = immigration.withColumn("mode_of_trans",col("mode_of_trans").cast("int"))
immigration = immigration.join(transport_lookup, transport_lookup["ID_TMP"] == immigration['mode_of_trans'], "left")
immigration = immigration.drop('ID_TMP', 'mode_of_trans')

#Cast to int
immigration = immigration.withColumn("year",col("year").cast("int"))\
    .withColumn("month",col("month").cast("int"))\
    .withColumn("ID",col("ID").cast("int"))\
    .withColumn("birth_year",col("birth_year").cast("int"))

#Decode I94VISA
visa_lookup = spark.read.load("lookup/I94VISA.csv", format="csv", columns="*", header=True).select('*').withColumnRenamed('ID', 'ID_TMP')\
.withColumnRenamed('Type', 'visa')
immigration = immigration.withColumn("visa_code",col("visa_code").cast("int"))
immigration = immigration.join(visa_lookup, visa_lookup["ID_TMP"] == immigration['visa_code'], "left")
immigration = immigration.drop('ID_TMP', 'visa_code')

#Convert SAS datatype to dataframe
immigration = immigration.withColumn("base_sas_date", to_date(lit("01/01/1960"), "MM/dd/yyyy")) \
            .withColumn("arrival_date", expr("date_add(base_sas_date, arrdate)")) \
            .withColumn("departure_date", expr("date_add(base_sas_date, depdate)")) \
            .drop("base_sas_date", "arrdate", "depdate")

#Decode I94CITY
city_lookup = spark.read.load("lookup/I94CIT_I94RES.csv", format="csv", columns="*", header=True).select('*')
immigration = immigration.withColumn("i94cit",col("i94cit").cast("int"))\
    .withColumn("i94res",col("i94res").cast("int"))

immigration = immigration.join(city_lookup, city_lookup["Code"] == immigration['i94cit'], "left")\
    .withColumnRenamed('I94CTRY', 'native_country')\
    .drop('i94cit', 'Code')

immigration = immigration.join(city_lookup, city_lookup["Code"] == immigration['i94res'], "left")\
    .withColumnRenamed('I94CTRY', 'residence_country')\
    .drop('i94res', 'Code')

immigration.head()


root
 |-- _c0: string (nullable = true)
 |-- ID: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- port_of_admission: string (nullable = true)
 |-- arrival_state: string (nullable = true)
 |-- birth_city: string (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- visatype: string (nullable = true)
 |-- Mode: string (nullable = true)
 |-- visa: string (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- departure_date: date (nullable = true)
 |-- native_country: string (nullable = true)
 |-- residence_country: string (nullable = true)



In [4]:
demographics = spark.read.load('us-cities-demographics.csv', format=INPUT_FORMAT, sep=';', header=True).select('*')\
    .withColumnRenamed('Median Age', 'Median_age')\
    .withColumnRenamed('Male Population', 'Male_population')\
    .withColumnRenamed('Female Population', 'Female_population')\
    .withColumnRenamed('Total Population', 'Total_population')\
    .withColumnRenamed('Number of Veterans', 'Number_of_veterans')\
    .withColumnRenamed('Average Household Size', 'Average_household_size')\
    .withColumnRenamed('State Code', 'State_code')\
    .withColumn("Male_population",col("Male_population").cast("int"))\
    .withColumn("Female_population",col("Female_population").cast("int"))\
    .withColumn("Total_population",col("Total_population").cast("int"))\
    .withColumn("Number_of_veterans",col("Number_of_veterans").cast("int"))\
    .withColumn("Foreign-born",col("Foreign-born").cast("int"))\
    .withColumn("Count",col("Count").cast("int"))

#Drop null values
demographics = demographics.na.drop(subset=['Male_population', 'Female_population', 'Number_of_veterans', 'Foreign-born', 'Average_household_size' ])

demographics.head()
demographics.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median_age: string (nullable = true)
 |-- Male_population: integer (nullable = true)
 |-- Female_population: integer (nullable = true)
 |-- Total_population: integer (nullable = true)
 |-- Number_of_veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average_household_size: string (nullable = true)
 |-- State_code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



In [6]:
temp = spark.read.load('../../data2/GlobalLandTemperaturesByCity.csv', format=INPUT_FORMAT, header=True).select('*')

#Get avg temp by country
temp = temp.groupby(["Country"]).agg({"AverageTemperature": "avg", "Latitude": "first", "Longitude": "first"})\
    .withColumnRenamed('avg(AverageTemperature)', 'Temperature')\
    .withColumnRenamed('first(Latitude)', 'Latitude')\
    .withColumnRenamed('first(Longitude)', 'Longitude')

#make country caps
temp = temp.withColumn("Country",upper(col("Country")))
temp.head()
temp.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [7]:
#airport: city -> states\
airports = spark.read.load('airport-codes_csv.csv', format=INPUT_FORMAT, header=True).select('*')

airports = airports.filter((airports.iso_country == 'US'))

airports = airports.withColumn('iso_region', col('iso_region').substr(4,10)).withColumnRenamed('iso_region', 'state')
airports.head()
airports.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- state: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)

