### Data sources
- LEHD Origin-Destination Employment Statistics (LODES): The definition of variable codes, datasets, etc. can be found at the latest [LODES 7.3 Technical Documentation](https://lehd.ces.census.gov/data/lodes/LODES7/LODESTechDoc7.3.pdf). All LEHD Origin-Destination Employment Statistics (LODES) data are available, as described in the LODES documentation above. No changes have been made to the original CSV files. Data are available from 2002 to 2015. See the documentation above for caveats.
- Driving Times and Distances Dataset: Census tracts are 2010 vintage, and the columns are the origin tract, destination travel, travel time in minutes, and travel distance in miles. These data were calculated by the Data Science team at the Urban Institute. See [Github repo](https://github.com/UI-Research/spark-osrm).

In [1]:
import pyspark
import numpy as np
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from IPython.core.interactiveshell import InteractiveShell
from pyspark.sql.types import StringType

warnings.filterwarnings(action='once')
InteractiveShell.ast_node_interactivity = "all"

In [2]:
spark = SparkSession.builder \
    .appName('pyspark-exploration') \
    .config('spark.driver.cores', '2') \
    .config('spark.executor.memory', '8gb') \
    .config('spark.executor.cores', '2') \
    .getOrCreate()     

In [3]:
def debug(df):
    """
    Function to pretty print the toDebugString
    """
    for rddstring in df.rdd.toDebugString().split('\n'):
        print rddstring.strip()

### Load and prepare data

Load in data and see what it looks like

In [4]:
drive = spark.read.parquet('s3://lsdm-emr-util/lsdm-data/travel-times/drive_times.parquet')
od = spark.read.parquet('s3://lsdm-emr-util/lsdm-data/lodes/od/od.parquet')

In [5]:
print((drive.count(), len(drive.columns)))
drive.take(2)
drive.dtypes

(122004331, 4)


[Row(from_tract=u'36103146402', to_tract=u'42091207003', miles=141.2, minutes=184.1),
 Row(from_tract=u'36103146402', to_tract=u'42091209000', miles=162.7, minutes=203.4)]

[('from_tract', 'string'),
 ('to_tract', 'string'),
 ('miles', 'double'),
 ('minutes', 'double')]

In [6]:
print((od.count(), len(od.columns)))
od.take(2)
od.dtypes

(1577789908, 14)


[Row(w_geocode=u'271630714002025', h_geocode=u'271630712082020', s000=1, sa01=0, sa02=1, sa03=0, se01=0, se02=1, se03=0, si01=1, si02=0, si03=0, createdate=u'20160219', year=2012),
 Row(w_geocode=u'271630714002025', h_geocode=u'271630712083004', s000=1, sa01=0, sa02=1, sa03=0, se01=0, se02=1, se03=0, si01=1, si02=0, si03=0, createdate=u'20160219', year=2012)]

[('w_geocode', 'string'),
 ('h_geocode', 'string'),
 ('s000', 'int'),
 ('sa01', 'int'),
 ('sa02', 'int'),
 ('sa03', 'int'),
 ('se01', 'int'),
 ('se02', 'int'),
 ('se03', 'int'),
 ('si01', 'int'),
 ('si02', 'int'),
 ('si03', 'int'),
 ('createdate', 'string'),
 ('year', 'int')]

Make census tract and state columns in origin-destination data

In [7]:
od = od.withColumn('h_tract', substring(od.h_geocode, 0, 11))\
        .withColumn('w_tract', substring(od.w_geocode, 0, 11))

In [8]:
od = od.withColumn('h_state', substring(od.h_geocode, 0, 2))\
        .withColumn('w_state', substring(od.w_geocode, 0, 2))

In [9]:
od.take(1)

[Row(w_geocode=u'271630714002025', h_geocode=u'271630712082020', s000=1, sa01=0, sa02=1, sa03=0, se01=0, se02=1, se03=0, si01=1, si02=0, si03=0, createdate=u'20160219', year=2012, h_tract=u'27163071208', w_tract=u'27163071400', h_state=u'27', w_state=u'27')]

Make state column in drive data

In [10]:
drive = drive.withColumn('from_state', substring(drive.from_tract, 0, 2))\
            .withColumn('to_state', substring(drive.to_tract, 0, 2))

Make "total" and "pct" columns

In [11]:
sa_cols = ['sa01', 'sa02', 'sa03']
se_cols = ['se01', 'se02', 'se03']
si_cols = ['si01', 'si02', 'si03']

In [12]:
od = od.withColumn('sa_total', od.sa01+od.sa02+od.sa03)\
    .withColumn('se_total', od.se01+od.se02+od.se03)\
    .withColumn('si_total', od.si01+od.si02+od.si03)

In [13]:
for cat_ls, cat_name in [(sa_cols, 'sa_total'), (se_cols, 'se_total'), (si_cols, 'si_total')]:
    for col in cat_ls:
        new = col + '_pct'
        od = od.withColumn(new, od[col]/od[cat_name])

In [14]:
od.columns

['w_geocode',
 'h_geocode',
 's000',
 'sa01',
 'sa02',
 'sa03',
 'se01',
 'se02',
 'se03',
 'si01',
 'si02',
 'si03',
 'createdate',
 'year',
 'h_tract',
 'w_tract',
 'h_state',
 'w_state',
 'sa_total',
 'se_total',
 'si_total',
 'sa01_pct',
 'sa02_pct',
 'sa03_pct',
 'se01_pct',
 'se02_pct',
 'se03_pct',
 'si01_pct',
 'si02_pct',
 'si03_pct']

Check out lineage and partitions of the two dataframes

In [15]:
debug(od)

(137) MapPartitionsRDD[28] at javaToPython at NativeMethodAccessorImpl.java:0 []
|   MapPartitionsRDD[27] at javaToPython at NativeMethodAccessorImpl.java:0 []
|   MapPartitionsRDD[26] at javaToPython at NativeMethodAccessorImpl.java:0 []
|   FileScanRDD[25] at javaToPython at NativeMethodAccessorImpl.java:0 []


In [16]:
debug(drive)

(33) MapPartitionsRDD[32] at javaToPython at NativeMethodAccessorImpl.java:0 []
|   MapPartitionsRDD[31] at javaToPython at NativeMethodAccessorImpl.java:0 []
|   MapPartitionsRDD[30] at javaToPython at NativeMethodAccessorImpl.java:0 []
|   FileScanRDD[29] at javaToPython at NativeMethodAccessorImpl.java:0 []


In [17]:
od.rdd.getNumPartitions()
drive.rdd.getNumPartitions()

137

33

### Join: origin-destination and driving dataframes  
Assumption: travel time and distance for a census tract is the same for all its comprising block groups.

Repartition od before joining



In [18]:
od = od.repartition(8)

Try working with just California data first, where origin and destination are state code 06  
Join od_ca and drive_ca

In [19]:
od_ca = od.where("h_state == 06" and "w_state == 06")

In [20]:
drive_ca = drive.where('to_state == 06' and 'from_state == 06')

In [21]:
df_ca = od_ca.join(drive_ca, [drive.from_tract == od.h_tract, drive.to_tract == od.w_tract])

In [22]:
df_ca.limit(1).collect()

[Row(w_geocode=u'060014355001011', h_geocode=u'060014001001036', s000=1, sa01=1, sa02=0, sa03=0, se01=0, se02=1, se03=0, si01=0, si02=0, si03=1, createdate=u'20170919', year=2015, h_tract=u'06001400100', w_tract=u'06001435500', h_state=u'06', w_state=u'06', sa_total=1, se_total=1, si_total=1, sa01_pct=1.0, sa02_pct=0.0, sa03_pct=0.0, se01_pct=0.0, se02_pct=1.0, se03_pct=0.0, si01_pct=0.0, si02_pct=0.0, si03_pct=1.0, from_tract=u'06001400100', to_tract=u'06001435500', miles=16.9, minutes=24.2, from_state=u'06', to_state=u'06')]

Join full od with driving, giving us travel times for each origin-destination pair.  

In [None]:
#df = od.join(drive, [drive.from_tract == od.h_tract, drive.to_tract == od.w_tract])

Resulting dataframe is split across 200 partitions, as we can see from the getNumPartitions method

In [None]:
#df.rdd.getNumPartitions()

In [None]:
#debug(df)

### Grouped aggregation: (block group level) average proportion of goods-producing jobs per year

Again, start with California only  

The proportion of jobs in California in Goods Producing industry sectors has steadily declined over the past decade.

In [None]:
#df_ca = df_ca.withColumn("year", df_ca.year.cast(StringType()))

In [23]:
agg_ca = df_ca.groupBy("year").agg(avg("si01_pct"))
results_ca = agg_ca.collect()

In [24]:
agg_ca_df = pd.DataFrame(results_ca)

In [27]:
agg_ca_df.columns = ['year', 'avg_si01_pct']
agg_ca_df.sort_values('year')

Unnamed: 0,year,avg_si01_pct
13,2002,0.199606
0,2003,0.193124
6,2004,0.192965
9,2005,0.193048
3,2006,0.189356
1,2007,0.185661
12,2008,0.175127
8,2009,0.164687
10,2010,0.153194
11,2011,0.152197


In [None]:
agg_ca1 = df_ca.groupBy("year").agg(sum("si01"), sum('si02'), sum('si03'))
results_ca1 = agg_ca1.collect()

In [None]:
agg_ca_df1 = pd.DataFrame(results_ca1)

In [None]:
agg_ca_df1

Now do it for the whole country

In [None]:
#df = df.withColumn("year", df.year.cast(StringType()))

In [None]:
#agg_full = df.groupBy("year").agg(avg("si01_pct"))
#results_full = agg_full.collect()

In [None]:
#agg_full_df = pd.DataFrame(results_full)

In [None]:
#agg_full_df

### Window function: 

In [None]:
df = df.withColumn("year", df.year.cast(IntegerType()))

In [None]:
window = Window.orderBy("year").partitionBy("h_geocode", "w_geocode").rangeBetween(-1, 1)