In [111]:
## Importing required libraries
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode, unix_timestamp, col, to_date, to_timestamp
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, TimestampType, DoubleType, ByteType, ShortType, LongType, FloatType, BooleanType

In [112]:
## SparkSession has been setup to create a DataFrame, register DataFrame as views, execute SQL over tables, read JSON files.
spark = SparkSession \
    .builder \
    .appName("Python Spark Data Frame extraction") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
sc = spark.sparkContext

In [113]:
## Read the nested JSON file
df = spark.read.option("multiline", True).json("./DM-classification.json")

## Print the schema of the file that has been read.
df.printSchema()

## Print the data frame.
df.show()

root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- content: string (nullable = true)
 |    |    |-- index: long (nullable = true)
 |    |    |-- label: long (nullable = true)
 |    |    |-- label_1: string (nullable = true)
 |    |    |-- label_2: string (nullable = true)
 |    |    |-- label_3: double (nullable = true)
 |    |    |-- label_4: string (nullable = true)
 |-- schema: struct (nullable = true)
 |    |-- fields: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |-- pandas_version: string (nullable = true)
 |    |-- primaryKey: array (nullable = true)
 |    |    |-- element: string (containsNull = true)

+--------------------+--------------------+
|                data|              schema|
+--------------------+--------------------+
|[[The battery is ...|[[[index, integer...|
+--------------

In [114]:
## Exploding the data column of the dataframe that has been extracted, in order to obtain the content and the data labels.
data_df = df.select(explode(df["data"])).toDF("temp").select("temp.content", "temp.label", "temp.label_1", "temp.label_2", "temp.label_3", "temp.label_4")

## Creating a new column having datetime datatype and the content of label_4.
data_df = data_df.withColumn("datetime", to_timestamp(unix_timestamp(col('label_4'),"yyyy-MM-dd'T'HH:mm:ss.SSSXXX").cast('timestamp')))

## Dropping label_4 from the dataframe.
data_df = data_df.drop("label_4")

## Print the data frame after adding and dropping a column.
data_df.show()

+--------------------+-----+-------+--------+------------+-------------------+
|             content|label|label_1| label_2|     label_3|           datetime|
+--------------------+-----+-------+--------+------------+-------------------+
|The battery is co...|    0|  small|separate|0.7155163569|2015-06-05 18:41:08|
|What a big waste ...|    0| medium|conected| 0.858630808|2016-10-29 12:12:46|
|Don't waste your ...|    0|  large|conected|0.2040485858|2016-04-29 14:44:31|
|Great sound and s...|    1|  large|separate| 0.332641236|2017-12-26 13:25:48|
|Really pleased wi...|    1| medium|conected| 0.887390017|2016-04-30 00:01:08|
|One of my favorit...|    1|  large|conected|0.2305351126|2016-04-30 17:29:03|
|best bluetooth on...|    1| medium|conected|0.4549175852|2017-04-24 04:26:54|
|Authentic leather...|    1|  large|conected|0.3198441525|2015-12-16 22:03:11|
|I was very excite...|    1| medium|conected| 0.835863266|2015-05-19 01:34:19|
|Do not make the s...|    0|  small|conected|0.14423

In [115]:
## Creating a list of all the rows that are present in the dataframe obtained after the above modifications have been made.  
Data = list(data_df.collect())

In [116]:
## Explode the fields part of the dataframe to obtain the data types of all the columns.
col_list = df.select(explode(df["schema.fields"])).toDF("level1").select("level1.type")
col_list.show()

## Extracting the above obtained datatypes to a list.
datatype_list = [row[0] for row in col_list.select("type").collect()]
print(datatype_list)

+--------+
|    type|
+--------+
| integer|
|  string|
| integer|
|  string|
|  string|
|  number|
|datetime|
+--------+

['integer', 'string', 'integer', 'string', 'string', 'number', 'datetime']


In [117]:
## Function to obtain the datatype from the JSON data that is avaialble.
def getType(raw):
    switch = {  
    "byte": ByteType(),
    "short": ShortType(),
    "integer": IntegerType(),
    "long": LongType(),
    "float": FloatType(),
    "number": DoubleType(),
    "boolean": BooleanType(),
    "datetime": TimestampType(),
    }
    return switch.get(raw, StringType())

In [118]:
## Creating a schema containing column names as per the question and data types as per the JSON data that was available and finally extracted to a list in the above steps.
## The getType function has been used to convert the field data available in string to datatype type.
Schema = StructType([
    StructField("content", getType(datatype_list[1]),True),
    StructField("label", getType(datatype_list[2]),True),
    StructField("size", getType(datatype_list[3]),True),
    StructField("usage", getType(datatype_list[4]),True),
    StructField("effect", getType(datatype_list[5]),True),
    StructField("date", getType(datatype_list[6]),True)
])

In [121]:
## Create a dataframe by combining the list of rows and the schema that have 
final_df = spark.createDataFrame(data=Data, schema=Schema)

## Print the schema of the data frame that has been created with the modified column names and data types.
final_df.printSchema()

## Print the data frame that has thus been obtained.
final_df.show()

root
 |-- content: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- size: string (nullable = true)
 |-- usage: string (nullable = true)
 |-- effect: double (nullable = true)
 |-- date: timestamp (nullable = true)

+--------------------+-----+------+--------+------------+-------------------+
|             content|label|  size|   usage|      effect|               date|
+--------------------+-----+------+--------+------------+-------------------+
|The battery is co...|    0| small|separate|0.7155163569|2015-06-05 18:41:08|
|What a big waste ...|    0|medium|conected| 0.858630808|2016-10-29 12:12:46|
|Don't waste your ...|    0| large|conected|0.2040485858|2016-04-29 14:44:31|
|Great sound and s...|    1| large|separate| 0.332641236|2017-12-26 13:25:48|
|Really pleased wi...|    1|medium|conected| 0.887390017|2016-04-30 00:01:08|
|One of my favorit...|    1| large|conected|0.2305351126|2016-04-30 17:29:03|
|best bluetooth on...|    1|medium|conected|0.4549175852|2017-04-

In [106]:
## Create a new temporary view using a SparkDataFrame in the Spark Session. 
## If a temporary view with the same name already exists, it is replaced.
final_df.createOrReplaceTempView("DM")

In [107]:
## Grouping the table by 'size', and sort based on 'date' and extract difference between 'date' in consecutive records'
groupedby =spark.sql("SELECT *, date - lag(date,1) OVER(PARTITION BY size order by date) as diff_date FROM DM")

## Creating a new column that is the difference between 'date' in consecutive records (within groups).
groupedby = groupedby.withColumn("diff_date", col("diff_date").cast('String'))
groupedby.printSchema()
groupedby.show()

root
 |-- content: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- size: string (nullable = true)
 |-- usage: string (nullable = true)
 |-- effect: double (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- diff_date: string (nullable = true)

+--------------------+-----+------+--------+------------+-------------------+--------------------+
|             content|label|  size|   usage|      effect|               date|           diff_date|
+--------------------+-----+------+--------+------------+-------------------+--------------------+
|Better than you'd...|    1|medium|separate| 0.049132322|2015-01-01 23:54:55|                null|
|I love the look a...|    1|medium|separate|0.5040957229|2015-01-04 04:11:57|52 hours 17 minut...|
|lightweight and w...|    1|medium|conected|0.2213363547|2015-01-13 15:30:56|227 hours 18 minu...|
|EXCELLENT SERVICE...|    1|medium|separate|0.6684907436|2015-01-14 23:04:14|31 hours 33 minut...|
|Everything about ...|    1|mediu

In [108]:
## Saving the above obtained data into a CSV file. The level of parallelism can be increased for better efficiency by changing the parameter in repartition option.
groupedby.repartition(1).write.format('csv').save("./output/Manipulated_data.csv", header = 'true')

AnalysisException: path file:/home/jovyan/work/output/Manipulated_data.csv already exists.;