I am using this Notebook to explore and implement ways to build the ETL jobs
- Identify the different dimensional tables, including their attributes
- Identify the fact tables and its attributes
- Usage of Pyspark to parse the Immigrations data
- Usage of plain old Python to parse the data dictionary

Since this data was purchased, and its has a monthly frequency, it is better suited for a one-off
analysis. Hence an Analytical notebook is appropriate;

Each of the dimensional and fact data will be a dataframe, which will written ito a parquet/csv file format
and stored in S3 buckets;

It can be queried from there via a Notebook or using Athena/Big Query

In [408]:
import pandas as pd
import numpy as np
import os
import re
from datetime import datetime, timedelta
from functools import reduce
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.functions import date_add
from pyspark.sql.types import (StructType as R,
                               StructField as Fld, DoubleType as Dbl, StringType as Str,
                               IntegerType as Int, DateType as Date, TimestampType as TimeStamp
                              )
# https://stackoverflow.com/questions/31841509/pyspark-exception-java-gateway-process-exited-before-sending-the-driver-its-po

In [2]:
spark = SparkSession.builder\
                    .config("spark.jars.packages", "saurfang:spark-sas7bdat:3.0.0-s_2.12") \
                    .enableHiveSupport()\
                    .getOrCreate()

In [3]:
#https://datascience.stackexchange.com/questions/11356/merging-multiple-data-frames-row-wise-in-pyspark
def append_df(*dfs):
    return reduce(lambda df1, df2: df1.unionByName(df2), dfs)

In [4]:
def concat_df(*dfs):
    return reduce(DataFrame.unionAll, dfs)

In [6]:
!pwd

/Users/home/Documents/Data Engineering ND/Capstone/notebook


In [8]:
path = '../data/18-83510-I94-Data-2016'
files = ['i94_apr16_sub.sas7bdat', 'i94_aug16_sub.sas7bdat']
dfs = []
for file in files:
    df = spark.read.format('com.github.saurfang.sas.spark')\
                .load(os.path.join(path, file))
    dfs.append(df)

In [306]:
df = concat_df(*dfs)

In [15]:
type(df)

pyspark.sql.dataframe.DataFrame

In [17]:
df.count()

7199883

In [89]:
df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

## Build Immigration table - i94_immigrations

In [382]:
def sas_date_converter(row, base_date='1960-01-01'):
    if row is None:
        return row
    return datetime.strptime(base_date, '%Y-%m-%d') + timedelta(int(row))

In [383]:
# https://stackoverflow.com/questions/57762685/is-there-a-way-to-use-pyspark-sql-functions-date-add-with-a-colcolumn-name-a
sas_date_converter2 = F.udf(lambda x: None if x!= x else datetime.strptime('1960-01-01', "%Y-%m-%d") + timedelta(x), Date())



In [384]:
spark.udf.register('SASDateConverter', sas_date_converter, Date())

<function __main__.sas_date_converter(row, base_date='1960-01-01')>

In [262]:
# Ignores junk values, which remain asis
change_date_format_1 = F.udf(lambda x: datetime.strptime(x.strip(), '%Y%m%d'), Date())

In [263]:
change_date_format_2 = F.udf(lambda x: datetime.strptime(x.strip(), '%m%d%Y'), Date())

In [302]:
def date_change(row):
    if row is None:
        return None
    if row.endswith('2016'):
        return datetime.strptime(row, '%m%d%Y')
    if row.startswith('2016'):
        return datetime.strptime(row, '%Y%m%d')
    return None

In [307]:
dt = F.udf(date_change, Date())

In [602]:
cols = ['cicid', 'i94yr', 'i94mon', 'i94port', 'i94mode', 'visapost', 
       'entdepa', 'entdepd', 'entdepu', 'matflag', 
       'dtadfile', 'dtaddto', 'arrdate']
immigrations = (
    df.select(cols)
        .dropDuplicates()
        .withColumn('custom_client_id', df['cicid'].cast(Int()).cast(Str())).drop('cicid')
        .withColumn('i94_year', df['i94yr'].cast(Int())).drop('i94yr')
        .withColumn('i94_month', df['i94mon'].cast(Int())).drop('i94mon')
        .withColumnRenamed('i94port', 'i94_port')
        .withColumn('mode_of_entry', df['i94mode'].cast(Int())).drop('i94mode')
        .withColumnRenamed('visapost', 'visa_post')
        .withColumnRenamed('entdepa', 'arrival_flag')
        .withColumnRenamed('entdepd', 'depature_flag')
        .withColumnRenamed('entdepu', 'update_flag')
        .withColumnRenamed('matflag', 'match_flag')
#        .withColumn('i94_entry_date', F.to_date('dtadfile', 'yyyymmdd').cast(Date()))
        .withColumn('i94_entry_date', dt('dtadfile'))
        .drop(F.col('dtadfile'))
        .withColumn('i94_valid_till', dt('dtaddto'))
        .drop(F.col('dtaddto'))
 #       .withColumn('x', sas_date_converter2(F.col('arrdate')))
)
    

In [603]:
immigrations.printSchema()

root
 |-- i94_port: string (nullable = true)
 |-- visa_post: string (nullable = true)
 |-- arrival_flag: string (nullable = true)
 |-- depature_flag: string (nullable = true)
 |-- update_flag: string (nullable = true)
 |-- match_flag: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- custom_client_id: string (nullable = true)
 |-- i94_year: integer (nullable = true)
 |-- i94_month: integer (nullable = true)
 |-- mode_of_entry: integer (nullable = true)
 |-- i94_entry_date: date (nullable = true)
 |-- i94_valid_till: date (nullable = true)



In [604]:
immigrations.limit(5).toPandas().head()

Unnamed: 0,i94_port,visa_post,arrival_flag,depature_flag,update_flag,match_flag,arrdate,custom_client_id,i94_year,i94_month,mode_of_entry,i94_entry_date,i94_valid_till
0,NEW,,T,O,,M,20545.0,161,2016,4,1,2016-04-01,2016-09-30
1,CHI,,O,O,,M,20545.0,498,2016,4,1,2016-04-01,2016-06-29
2,CHI,,G,O,,M,20545.0,512,2016,4,1,2016-04-01,2016-06-29
3,ATL,,G,O,,M,20545.0,790,2016,4,1,2016-04-01,2016-06-29
4,NEW,,G,O,,M,20545.0,991,2016,4,1,2016-04-01,2016-06-29


In [318]:
immigrations.agg({'i94_entry_date': 'max'}).show()

+-------------------+
|max(i94_entry_date)|
+-------------------+
|         2016-11-21|
+-------------------+



In [204]:
df.agg({'dtaddto': 'min'}).show()

+------------+
|min(dtaddto)|
+------------+
|    /   183D|
+------------+



In [315]:
immigrations.agg({'i94_valid_till': 'min'}).show()

+-------------------+
|min(i94_valid_till)|
+-------------------+
|         2016-01-01|
+-------------------+



In [246]:
# l1 = [(5748517.0,'1960-01-01', '20160401'), (5748517.0,'1960-01-01', '2016051'), (5748517.0,'1960-01-01', '/   183D')]
# df_test = spark.createDataFrame(l1).toDF('cic_id','sas_date','arrival_date')
# df_test.show()

+---------+----------+------------+
|   cic_id|  sas_date|arrival_date|
+---------+----------+------------+
|5748517.0|1960-01-01|    20160401|
|5748517.0|1960-01-01|     2016051|
|5748517.0|1960-01-01|    /   183D|
+---------+----------+------------+



In [248]:
#df_test.withColumn('actual_arrival_date', date_add_udf(F.to_date('sas_date'), 'arrival_date')).show()

## Build transaction table - i94_visitors

In [390]:
df.createOrReplaceTempView('i94_visitors')
txns = spark.sql("""
    SELECT
        STRING(INT(cicid)) AS custom_client_id,
        STRING(INT(admnum)) AS admissions_number,
        INT(i94yr) AS i94_year,
        INT(i94mon) AS i94_month,
        STRING(INT(i94cit)) AS arrived_country_id,
        STRING(INT(i94res)) AS resident_country_id,
        SASDateConverter(arrdate) AS arrival_date,
        SASDateConverter(depdate) AS depature_date,
        STRING(fltno) AS flight_num,
        STRING(INT(i94visa)) AS visa_id,
        STRING(visatype) AS visa_category
    FROM
        i94_visitors
""")

In [391]:
txns.limit(20).toPandas().head(10)

Unnamed: 0,custom_client_id,admissions_number,i94_year,i94_month,arrived_city,resident_city,arrival_date,depature_date,flight_num,travel_purpose,visa_category
0,6,1897628485,2016,4,692,692,2016-04-29,,,2,B2
1,7,2147483647,2016,4,254,276,2016-04-07,,296.0,3,F1
2,15,666643185,2016,4,101,101,2016-04-01,2016-08-25,93.0,2,B2
3,16,2147483647,2016,4,101,101,2016-04-01,2016-04-23,199.0,2,B2
4,17,2147483647,2016,4,101,101,2016-04-01,2016-04-23,199.0,2,B2
5,18,2147483647,2016,4,101,101,2016-04-01,2016-04-11,602.0,1,B1
6,19,2147483647,2016,4,101,101,2016-04-01,2016-04-14,602.0,2,B2
7,20,2147483647,2016,4,101,101,2016-04-01,2016-04-14,602.0,2,B2
8,21,2147483647,2016,4,101,101,2016-04-01,2016-04-09,602.0,2,B2
9,22,2147483647,2016,4,101,101,2016-04-01,2016-04-18,608.0,1,B1


In [348]:
# Check for null values in a field
df.select(F.count(F.when(F.col('arrdate').isNull(), 'arrdate'))).show()

+---------------------------------------------------+
|count(CASE WHEN (arrdate IS NULL) THEN arrdate END)|
+---------------------------------------------------+
|                                                  0|
+---------------------------------------------------+



## Build Flight dimensional table - A reference table i94_flights

In [395]:
df.createOrReplaceTempView('i94_flights')
airlines = spark.sql("""
    SELECT
        STRING(fltno) AS flight_num,
        airline
    FROM
        i94_flights
    WHERE 
        fltno IS NOT NULL
""")

In [396]:
airlines.show()

+----------+-------+
|flight_num|airline|
+----------+-------+
|     00296|   null|
|        93|     OS|
|     00199|     AA|
|     00199|     AA|
|     00602|     AZ|
|     00602|     AZ|
|     00602|     AZ|
|     00602|     AZ|
|     00608|     AZ|
|     00001|     TK|
|     03348|     MQ|
|     00422|     LH|
|     00422|     LH|
|     00614|     AZ|
|     00089|     OS|
|     00089|     OS|
|     00033|     TK|
|     00602|     AZ|
|         1|     TK|
|     00001|     TK|
+----------+-------+
only showing top 20 rows



## Build Visa dimensional table - A reference table i94_visa

This data comes from a data dictionary file
- i94_visa (visa_codes)
- i94_visa_category
- i94_arrived_city (city_codes)
- i94_resident_city (city_codes)
- i94_mode (mode_codes)
- i94_port (port_codes)
- i94_addr (state_codes)

In [471]:
i94_ref_file = r'../dictionary/I94_SAS_Labels_Descriptions.SAS'
def parse_ref_file(file, start_pos=2, end_pos=7):
    data = []
    ref_dict = {}
    with open(file) as f:
        content = f.read()
        data = [word for lines in content.split(';')[start_pos: end_pos] for word in lines.splitlines(True) if '=' in word]
        for item in data:
            k = item.split('=')[0].strip().strip("'")
            v = item.split('=')[1].strip().strip("'") 
            if k not in ref_dict:
                ref_dict[k] = v
    return ref_dict

In [488]:
(
pd.Series(parse_ref_file(i94_ref_file, 2, 3))
    .to_frame()
    .rename(columns={0: 'country'})
    .reset_index()
    .rename(columns={'index': 'country_id'})
)

Unnamed: 0,country_code,country
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA
...,...,...
284,791,No Country Code (791)
285,849,No Country Code (849)
286,914,No Country Code (914)
287,944,No Country Code (944)


In [506]:
port_df = (
pd.Series(parse_ref_file(i94_ref_file, 3, 4))
    .to_frame()
    .rename(columns={0: 'city'})
    .reset_index()
    .rename(columns={'index': 'port_id'})
)
res = pd.concat([port_df['port_id'].to_frame(), port_df['city'].str.strip().str.rsplit(',', 1, expand=True)], axis=1)

In [507]:
res[1].unique()


array([' AK', ' AL', ' AR (BPS)', ' AR', ' AZ', None, ' CA', ' CA (BPS)',
       ' CO', ' CO #ARPT', ' CT', ' DE', ' FL', ' FL #ARPT', ' GA', ' GU',
       ' HI', ' IA', ' ID', ' IL', ' IN', ' KS', ' KY', ' LA',
       ' LA (BPS)', ' MA', ' WA', ' MD', ' ME', ' MT', ' ME (BPS)', ' MI',
       ' MN', ' MO', ' MS', ' MT (BPS)', ' NC', ' ND', ' NE', ' NH',
       ' NJ', ' NM (BPS)', ' NM', ' NV', ' NY', ' OH', ' OK', ' OR',
       ' PA', ' PR', ' RI', ' SC', ' SC #ARPT', ' SD', ' SPN', ' TN',
       ' TX', ' TX (BPS)', 'TX', ' VI', ' UT', ' VA', ' VA #ARPT', ' VT',
       ' VT (I-91)', ' VT (RT. 5)', ' VT (BP - SECTOR HQ)',
       ' WASHINGTON #INTL', ' WA (BPS)', ' WI', ' WV', ' WY', ' CANADA',
       ' Canada', ' NETHERLANDS', ' NETH ANTILLES', ' THAILAND',
       ' ETHIOPIA', ' PRC', ' BERMUDA', ' COLOMBIA', ' ARGENTINA',
       ' MEXICO', ' BRAZIL', ' URUGUAY', ' IRELAND', ' GABON', ' BAHAMAS',
       ' MX', ' CAYMAN ISLAND', ' SEOUL KOREA', ' JAPAN', ' ROMANIA',
       ' INDONESIA', 

In [568]:
res

Unnamed: 0,port_id,0,1
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK
...,...,...,...
655,ADU,No PORT Code (ADU),
656,AKT,No PORT Code (AKT),
657,LIT,No PORT Code (LIT),
658,A2A,No PORT Code (A2A),


In [491]:
(
pd.Series(parse_ref_file(i94_ref_file, 4, 5))
    .to_frame()
    .rename(columns={0: 'mode'})
    .reset_index()
    .rename(columns={'index': 'mode_id'})
)

Unnamed: 0,port_code,city
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


In [493]:
(
pd.Series(parse_ref_file(i94_ref_file, 5, 6))
    .to_frame()
    .rename(columns={0: 'state_id'})
    .reset_index()
    .rename(columns={'index': 'state'})
).head()

Unnamed: 0,state,state_id
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


In [494]:
(
pd.Series(parse_ref_file(i94_ref_file, 6, 7))
    .to_frame()
    .rename(columns={0: 'visa_id'})
    .reset_index()
    .rename(columns={'index': 'purpose'})
).head()

Unnamed: 0,state,state_id
0,1,Business
1,2,Pleasure
2,3,Student


In [497]:
'GEN M. ESCOBEDO, Monterrey, MX'.rsplit(',', 1)

['GEN M. ESCOBEDO, Monterrey', ' MX']

## Build Airport dimensional table - airports

In [508]:
airport_file = r'../data/airport-codes.csv'
airports = pd.read_csv(airport_file)

In [509]:
airports.shape

(55075, 12)

In [510]:
airports.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [511]:
airports['iso_region'].unique()

array(['US-PA', 'US-KS', 'US-AK', ..., 'MN-073', 'MN-1', 'MN-046'],
      dtype=object)

In [514]:
airports[airports['iso_region'].str.contains(r'^US')]['iso_region'].unique()

array(['US-PA', 'US-KS', 'US-AK', 'US-AL', 'US-AR', 'US-OK', 'US-AZ',
       'US-CA', 'US-CO', 'US-FL', 'US-GA', 'US-HI', 'US-ID', 'US-IN',
       'US-IL', 'US-KY', 'US-LA', 'US-MD', 'US-MI', 'US-MN', 'US-MO',
       'US-MT', 'US-NJ', 'US-NC', 'US-NY', 'US-OH', 'US-OR', 'US-SC',
       'US-SD', 'US-TX', 'US-TN', 'US-UT', 'US-VA', 'US-WA', 'US-WI',
       'US-WV', 'US-WY', 'US-CT', 'US-IA', 'US-MA', 'US-ME', 'US-NE',
       'US-NH', 'US-NM', 'US-NV', 'US-MS', 'US-ND', 'US-VT', 'US-RI',
       'US-DC', 'US-DE', 'US-U-A'], dtype=object)

In [524]:
airports.dtypes

ident            object
type             object
name             object
elevation_ft    float64
continent        object
iso_country      object
iso_region       object
municipality     object
gps_code         object
iata_code        object
local_code       object
coordinates      object
dtype: object

In [530]:
# should be able to directly read it into a Spark DF
airports_spark = spark.read.option('header', True) \
                            .csv(airport_file)

In [531]:
airports_spark.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [538]:
airports_spark.createOrReplaceTempView('airports')
airports_sql = spark.sql("""
    SELECT
        STRING(ident) AS airport_id,
        type AS airport_type,
        name AS airpot_name,
        elevation_ft,
        continent,
        iso_country,
        iso_region,
        CASE WHEN iso_region LIKE 'US-%' THEN SPLIT(iso_region, '-')[1] ELSE NULL END AS us_cities,
        municipality,
        gps_code,
        iata_code,
        local_code,
        CAST(SPLIT(coordinates, ',')[0] AS DOUBLE) AS latitude,
        CAST(SPLIT(coordinates, ',')[1] AS DOUBLE) AS longitude
    FROM
        airports
""")

In [539]:
airports_sql.toPandas().head()

Unnamed: 0,airport_id,airport_type,airpot_name,elevation_ft,continent,iso_country,iso_region,us_cities,municipality,gps_code,iata_code,local_code,latitude,longitude
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,PA,Bensalem,00A,,00A,-74.933601,40.070801
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,KS,Leoti,00AA,,00AA,-101.473911,38.704022
2,00AK,small_airport,Lowell Field,450,,US,US-AK,AK,Anchor Point,00AK,,00AK,-151.695999,59.9492
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,AL,Harvest,00AL,,00AL,-86.770302,34.864799
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,AR,Newport,,,,-91.254898,35.6087


## US Demographic dimension table

In [543]:
us_demo_file = r'../data/us-cities-demographics.csv'
us_demo = pd.read_csv(us_demo_file, sep=';')

In [544]:
us_demo.shape

(2891, 12)

In [545]:
us_demo.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [549]:
us_demo_spark = spark.read.options(header='True', delimiter=';') \
                            .csv(us_demo_file)

In [550]:
us_demo_spark.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [560]:
us_demo_spark.createOrReplaceTempView('us_states_demographics')
states_sql = spark.sql("""
    SELECT
        State AS state,
        DOUBLE(`Median Age`) AS median_age,
        INT(`Male Population`) AS male_population,
        INT(`Female Population`) AS female_population,
        INT(`Number of Veterans`) AS num_veterans,
        INT(`Foreign-born`) AS num_foreign_born,
        DOUBLE(`Average Household Size`) AS avg_household_size,
        `State Code` AS state_code
    FROM
        us_states_demographics
""")

In [561]:
states_sql.toPandas().head()

Unnamed: 0,state,median_age,male_population,female_population,num_veterans,num_foreign_born,avg_household_size,state_code
0,Maryland,33.8,40601.0,41862.0,1562.0,30908.0,2.6,MD
1,Massachusetts,41.0,44129.0,49500.0,4147.0,32935.0,2.39,MA
2,Alabama,38.5,38040.0,46799.0,4819.0,8229.0,2.58,AL
3,California,34.5,88127.0,87105.0,5821.0,33878.0,3.18,CA
4,New Jersey,34.6,138040.0,143873.0,5829.0,86253.0,2.73,NJ


In [605]:
states_sql.write.mode('overwrite').csv('test')

In [574]:
us_demo_spark.createOrReplaceTempView('us_cities_demographics')
cities_sql = spark.sql("""
    SELECT
        UPPER(City) AS city,
        Race AS race,
        INT(Count) AS population
    FROM
        us_cities_demographics
""")

In [575]:
cities_sql.toPandas().head()

Unnamed: 0,city,race,population
0,SILVER SPRING,Hispanic or Latino,25924
1,QUINCY,White,58723
2,HOOVER,Asian,4759
3,RANCHO CUCAMONGA,Black or African-American,24437
4,NEWARK,White,76402


In [578]:
cities_sql.filter(F.col('city') == 'DALTONS CACHE').show()

+----+----+----------+
|city|race|population|
+----+----+----------+
+----+----+----------+



## Date dimension

In [599]:
date = (
        txns.select(
                    F.col('arrival_date'), 
                    F.year('arrival_date').alias('year'),
                    F.month('arrival_date').alias('month'),
                    F.dayofmonth('arrival_date').alias('day'),
                    F.dayofweek('arrival_date').alias('dayofweek'),
                    F.when((F.dayofweek('arrival_date') == 1) | 
                           (F.dayofweek('arrival_date') == 7), 'weekend').otherwise('weekday').alias('is_weekend')
        )
)

In [600]:

#     F.expr("case when F.dayofweek('arrival_date') == 1 or \
#                            F.dayofweek('arrival_date') == 7 then weekend else weekday end").alias('is_weekend')

In [601]:
date.show()

+------------+----+-----+---+---------+----------+
|arrival_date|year|month|day|dayofweek|is_weekend|
+------------+----+-----+---+---------+----------+
|  2016-04-29|2016|    4| 29|        6|   weekday|
|  2016-04-07|2016|    4|  7|        5|   weekday|
|  2016-04-01|2016|    4|  1|        6|   weekday|
|  2016-04-01|2016|    4|  1|        6|   weekday|
|  2016-04-01|2016|    4|  1|        6|   weekday|
|  2016-04-01|2016|    4|  1|        6|   weekday|
|  2016-04-01|2016|    4|  1|        6|   weekday|
|  2016-04-01|2016|    4|  1|        6|   weekday|
|  2016-04-01|2016|    4|  1|        6|   weekday|
|  2016-04-01|2016|    4|  1|        6|   weekday|
|  2016-04-01|2016|    4|  1|        6|   weekday|
|  2016-04-01|2016|    4|  1|        6|   weekday|
|  2016-04-01|2016|    4|  1|        6|   weekday|
|  2016-04-01|2016|    4|  1|        6|   weekday|
|  2016-04-01|2016|    4|  1|        6|   weekday|
|  2016-04-01|2016|    4|  1|        6|   weekday|
|  2016-04-01|2016|    4|  1|  