# 1. Data Manipulation with Spark SQL

Prepared by Jason T. Brown

2020 Aug 29

Read JSON data into Spark SQL, conduct some analysis on the frequency of observations over time.

In [1]:
import pyspark
from pyspark.sql.types import *
import json


## Configure

Default local spark context settings used for portability. The data is small enough for the processing to complete very quickly on this.

This should be able to run in any Python 3 environment with a `pip` installed Spark 2.4.

In [2]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

## ETL

Because the dataset is small we will read in memory then convert into a Spark RDD, then a Spark SQL DataFrame. A bit unusual.

In [3]:
with open('Data/classification/DM-classification.json') as f:
    data_and_schema = json.load(f)
    data = data_and_schema['data']

The field names we want to use is not reflected in the JSON file.

In [4]:
data_and_schema['schema']

{'fields': [{'name': 'index', 'type': 'integer'},
  {'name': 'content', 'type': 'string'},
  {'name': 'label', 'type': 'integer'},
  {'name': 'label_1', 'type': 'string'},
  {'name': 'label_2', 'type': 'string'},
  {'name': 'label_3', 'type': 'number'},
  {'name': 'label_4', 'type': 'datetime'}],
 'primaryKey': ['index'],
 'pandas_version': '0.20.0'}

In [5]:
schema = StructType([
    StructField('content', StringType()),
    StructField('label', IntegerType()),
    StructField('label_1', StringType()), #size
    StructField('label_2', StringType()), # usage
    StructField('label_3', DoubleType()), # effect 
    StructField('label_4', TimestampType()) # date
])

In [6]:
df = spark.read.json(spark.sparkContext.parallelize(data), 
                     schema=schema,
                     mode='FAILFAST') \
        .withColumnRenamed('label_1', 'size') \
        .withColumnRenamed('label_2', 'usage') \
        .withColumnRenamed('label_3', 'effect') \
        .withColumnRenamed('label_4', 'date') 

df.createOrReplaceTempView('df')

In [7]:
spark.sql("SELECT * FROM df LIMIT 10").show()

+--------------------+-----+------+--------+------------+-------------------+
|             content|label|  size|   usage|      effect|               date|
+--------------------+-----+------+--------+------------+-------------------+
|The battery is co...|    0| small|separate|0.7155163569|2015-06-05 18:41:08|
|What a big waste ...|    0|medium|conected| 0.858630808|2016-10-29 12:12:46|
|Don't waste your ...|    0| large|conected|0.2040485858|2016-04-29 14:44:31|
|Great sound and s...|    1| large|separate| 0.332641236|2017-12-26 13:25:48|
|Really pleased wi...|    1|medium|conected| 0.887390017|2016-04-30 00:01:08|
|One of my favorit...|    1| large|conected|0.2305351126|2016-04-30 17:29:03|
|best bluetooth on...|    1|medium|conected|0.4549175852|2017-04-24 04:26:54|
|Authentic leather...|    1| large|conected|0.3198441525|2015-12-16 22:03:11|
|I was very excite...|    1|medium|conected| 0.835863266|2015-05-19 01:34:19|
|Do not make the s...|    0| small|conected|0.1442302304|2015-02

## Process

In [8]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
import sys

First define the window: groups by `size` and orders by date. Chooses "next" row only.  This lets us calculate the difference between `date` in consecutive records, in seconds.

In [9]:
ws = Window.partitionBy('size') \
           .orderBy('date') \
           .rowsBetween(0, 1) 

In [10]:
date_gap_df = df.select(
    'size', 'date', 
     (F.max(F.unix_timestamp(df.date)).over(ws) -
      F.min(F.unix_timestamp(df.date)).over(ws)).alias('date_gap_s'))

date_gap_df.withColumn('gap_days', F.col('date_gap_s') / 24 / 60 / 60).show()

+------+-------------------+----------+-------------------+
|  size|               date|date_gap_s|           gap_days|
+------+-------------------+----------+-------------------+
|medium|2015-01-01 23:54:55|    188222|   2.17849537037037|
|medium|2015-01-04 04:11:57|    818339|  9.471516203703706|
|medium|2015-01-13 15:30:56|    113598| 1.3147916666666668|
|medium|2015-01-14 23:04:14|     75788|  0.877175925925926|
|medium|2015-01-15 20:07:22|     37015|0.42841435185185184|
|medium|2015-01-16 06:24:17|    144718|  1.674976851851852|
|medium|2015-01-17 22:36:15|    874460| 10.121064814814815|
|medium|2015-01-28 01:30:35|    232926| 2.6959027777777775|
|medium|2015-01-30 18:12:41|     92254| 1.0677546296296296|
|medium|2015-01-31 19:50:15|    155257| 1.7969560185185185|
|medium|2015-02-02 14:57:52|     50275| 0.5818865740740741|
|medium|2015-02-03 04:55:47|    303770|  3.515856481481481|
|medium|2015-02-06 17:18:37|    799491|  9.253368055555557|
|medium|2015-02-15 23:23:28|    365441| 

In [11]:
date_gap_df.filter(F.col('size') == 'large') \
            .withColumn('gap_days', F.col('date_gap_s') / 24 / 60 / 60) \
            .show(10)

+-----+-------------------+----------+------------------+
| size|               date|date_gap_s|          gap_days|
+-----+-------------------+----------+------------------+
|large|2015-01-02 11:44:05|    114974|1.3307175925925925|
|large|2015-01-03 19:40:19|    303618| 3.514097222222222|
|large|2015-01-07 08:00:37|     58255|0.6742476851851851|
|large|2015-01-08 00:11:32|    117074| 1.355023148148148|
|large|2015-01-09 08:42:46|    466253| 5.396446759259259|
|large|2015-01-14 18:13:39|     21786|0.2521527777777778|
|large|2015-01-15 00:16:45|    383858| 4.442800925925926|
|large|2015-01-19 10:54:23|    220700| 2.554398148148148|
|large|2015-01-22 00:12:43|   1098646|12.715810185185184|
|large|2015-02-03 17:23:29|    938456| 10.86175925925926|
+-----+-------------------+----------+------------------+
only showing top 10 rows



We can see that adding the `date_gap` in days to the `date` on the same row will get us the `date` on the following row.


## Explore data

We can see the `small` size are the most frequent and `large` are the least frequent. 

In [12]:
date_gap_df.groupby('size').agg(F.mean('date_gap_s')).show()

+------+-----------------+
|  size|  avg(date_gap_s)|
+------+-----------------+
|medium|         278014.0|
| small|274956.4340175953|
| large|    295512.553125|
+------+-----------------+



## Save 

We will save results to CSV file.

Because the data is small we will convert it to a pandas dataframe first. Pandas has a little nicer functionality for writing to CSV.

I chose not to save out the conversion to days because saving the same data with different units is an anti-pattern. Downstream one of duplicates can become out of sync with each other can cause problems.

In [13]:
date_gap_df.toPandas().to_csv('date_gap.csv', header=True, index=False)

In [14]:
!head date_gap.csv

size,date,date_gap_s
medium,2015-01-01 23:54:55,188222
medium,2015-01-04 04:11:57,818339
medium,2015-01-13 15:30:56,113598
medium,2015-01-14 23:04:14,75788
medium,2015-01-15 20:07:22,37015
medium,2015-01-16 06:24:17,144718
medium,2015-01-17 22:36:15,874460
medium,2015-01-28 01:30:35,232926
medium,2015-01-30 18:12:41,92254
