In [1]:
import os
exec(open(os.path.join(os.environ["SPARK_HOME"], 'python/pyspark/shell.py')).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 3.7.2 (v3.7.2:9a3ffc0492, Dec 24 2018 02:44:43)
SparkSession available as 'spark'.


In [2]:
from pyspark.sql.functions import min, max, lit, from_unixtime, to_timestamp, substring
from pyspark.sql.functions import sum, count, avg, expr, col

#### ELPC Raw Data

In [3]:
elpc = spark.read.format("csv")\
.option("inferSchema", "true").option("sep", "\t").load("measurements.tsv", header="true")

In [4]:
exclude_columns = {'timezone_offset', 'milliseconds', 'measured_value', 'created_at', 'neighborhood_id', 'hexagon_id'}
elpc = elpc.select([c for c in elpc.columns if c not in exclude_columns])

In [5]:
elpc_datetime = elpc.withColumn("hour", to_timestamp(substring("time", 0, 13), "yyyy-MM-dd HH"))
elpc_datetime = elpc_datetime.withColumn("subsystem", lit("ELPC"))
elpc_datetime = elpc_datetime.withColumn("parameter", lit("pm2_5"))
elpc_datetime.show(1)

+---+-----+----------+----------+-------------------+---------+-------+-----------+----------+-------------------+---------+---------+
| id|value|  latitude| longitude|               time|stream_id|ward_id|   tract_id|zipcode_id|               hour|subsystem|parameter|
+---+-----+----------+----------+-------------------+---------+-------+-----------+----------+-------------------+---------+---------+
|  0| 1.22|41.8867226|-87.626929|2017-03-14 12:44:02|    80942|     42|17031320100|     60601|2017-03-14 12:00:00|     ELPC|    pm2_5|
+---+-----+----------+----------+-------------------+---------+-------+-----------+----------+-------------------+---------+---------+
only showing top 1 row



In [17]:
elpc_hourly = elpc_datetime.groupby(col('zipcode_id').alias('zipcode'),\
                                    col('latitude').alias('lat'), col('longitude').alias('long'),\
                                    'subsystem', 'parameter',col('hour').alias('datetime')).avg('value').alias('value')
elpc_hourly.show(1)


+-------+----------+-----------+---------+---------+-------------------+-----------------+
|zipcode|       lat|       long|subsystem|parameter|           datetime|       avg(value)|
+-------+----------+-----------+---------+---------+-------------------+-----------------+
|  60640|41.9742408|-87.6782954|     ELPC|    pm2_5|2017-04-04 08:00:00|41.95923076923077|
+-------+----------+-----------+---------+---------+-------------------+-----------------+
only showing top 1 row



#### AoT Raw Data

In [7]:
aot = spark.read.format("csv")\
.option("inferSchema", "true").load("Chicago AoT Monthly Aggregates.csv", header="true")
aot.show(1)

+----------+------------+---------+------+-------------+--------------------+
| timestamp|     node_id|subsystem|sensor|    parameter|       avg_value_hrf|
+----------+------------+---------+------+-------------+--------------------+
|1525989600|001e06113107|chemsense|    co|concentration|-0.45517749999999996|
+----------+------------+---------+------+-------------+--------------------+
only showing top 1 row



In [8]:
node_zips = spark.read.format("csv")\
.option("inferSchema", "true").load("NodeswZip.csv", header="true")
node_zips.show(1)

+------------+-----------+---+--------------------+---------+----------+-------------------+---------------+-------------+-------+
|     node_id| project_id|vsn|             address|      lat|       lon|        description|start_timestamp|end_timestamp|zipcode|
+------------+-----------+---+--------------------+---------+----------+-------------------+---------------+-------------+-------+
|001e0610ba46|AoT_Chicago|  4|State St & Jackso...|41.878377|-87.627678|AoT Chicago (S) [C]|   10/9/17 0:00|         null|  60604|
+------------+-----------+---+--------------------+---------+----------+-------------------+---------------+-------------+-------+
only showing top 1 row



In [9]:
aot_hourly = aot.withColumn('datetime', from_unixtime('timestamp'))
aot_hourly.show(1)

+----------+------------+---------+------+-------------+--------------------+-------------------+
| timestamp|     node_id|subsystem|sensor|    parameter|       avg_value_hrf|           datetime|
+----------+------------+---------+------+-------------+--------------------+-------------------+
|1525989600|001e06113107|chemsense|    co|concentration|-0.45517749999999996|2018-05-10 17:00:00|
+----------+------------+---------+------+-------------+--------------------+-------------------+
only showing top 1 row



In [10]:
aot_zip = aot_hourly.join(node_zips, on='node_id', how='left')
aot_zip.show(1)

+------------+----------+---------+------+-------------+--------------------+-------------------+-----------+---+--------------------+---------+---------+--------------------+---------------+-------------+-------+
|     node_id| timestamp|subsystem|sensor|    parameter|       avg_value_hrf|           datetime| project_id|vsn|             address|      lat|      lon|         description|start_timestamp|end_timestamp|zipcode|
+------------+----------+---------+------+-------------+--------------------+-------------------+-----------+---+--------------------+---------+---------+--------------------+---------------+-------------+-------+
|001e06113107|1525989600|chemsense|    co|concentration|-0.45517749999999996|2018-05-10 17:00:00|AoT_Chicago| 72|7801 S Lawndale A...|41.751142|-87.71299|AoT Chicago (S) [...|    1/1/17 0:00|         null|  60652|
+------------+----------+---------+------+-------------+--------------------+-------------------+-----------+---+--------------------+---------+

In [16]:
aot_hourly = aot_zip.groupby('zipcode', 'subsystem', 'lat', col('lon').alias('long'),'sensor', 'parameter', 'datetime').avg('avg_value_hrf')
aot_hourly.show(1)

+-------+---------+---------+----------+------+-------------+-------------------+--------------------+
|zipcode|subsystem|      lat|      long|sensor|    parameter|           datetime|  avg(avg_value_hrf)|
+-------+---------+---------+----------+------+-------------+-------------------+--------------------+
|  60601|chemsense|41.884607|-87.624577|   no2|concentration|2018-05-10 19:00:00|0.006243049645390072|
+-------+---------+---------+----------+------+-------------+-------------------+--------------------+
only showing top 1 row



In [21]:
elpc_hourly = elpc_hourly.withColumn('sensor', lit('mobile'))
elpc_hourly.show(1)

+-------+----------+-----------+---------+---------+-------------------+-----------------+------+
|zipcode|       lat|       long|subsystem|parameter|           datetime|       avg(value)|sensor|
+-------+----------+-----------+---------+---------+-------------------+-----------------+------+
|  60640|41.9742408|-87.6782954|     ELPC|    pm2_5|2017-04-04 08:00:00|41.95923076923077|mobile|
+-------+----------+-----------+---------+---------+-------------------+-----------------+------+
only showing top 1 row



In [23]:
combined = aot_hourly.union(elpc_hourly.select('zipcode', 'subsystem', 'lat', 'long', 
                                               'sensor', 'parameter', 'datetime', 'avg(value)'))

In [24]:
combined.write.format("csv")\
  .option("mode", "OVERWRITE")\
  .option("dateFormat", "yyyy-MM-dd hh:mm:ss")\
  .option("path", "combined_dataset")\
  .save()