In [1]:
import os

In [2]:
# os.environ['PYSPARK_PYTHON'] = "python"


In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master("local[1]").appName('KS1').getOrCreate()

In [3]:
import sys
print(sys.version)
print(sys.executable)

3.10.9 | packaged by Anaconda, Inc. | (main, Mar  1 2023, 18:18:15) [MSC v.1916 64 bit (AMD64)]
C:\Users\ASUS\anaconda3\python.exe


### Read file and print schema

**Initial exploration of data**

In [4]:
df = spark.read.json("Digital_Music.json")

In [5]:
df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- image: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- style: struct (nullable = true)
 |    |-- Color:: string (nullable = true)
 |    |-- Format:: string (nullable = true)
 |    |-- Size:: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- vote: string (nullable = true)



In [None]:
#(item,user,rating,timestamp)

In [6]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType

schema = StructType([
    StructField("item", StringType(), True),
    StructField("user", StringType(), True),
    StructField("rating", FloatType(), True),
    StructField("timestamp", StringType(), True)
])

In [7]:
df2 = spark.read.csv("Patio_Lawn_and_Garden.csv", header=False, schema=schema)

In [8]:
df.show()

+----------+-----+-------+--------------------+-----------+--------------+-------------------+--------------------+--------------------+--------------+--------+----+
|      asin|image|overall|          reviewText| reviewTime|    reviewerID|       reviewerName|               style|             summary|unixReviewTime|verified|vote|
+----------+-----+-------+--------------------+-----------+--------------+-------------------+--------------------+--------------------+--------------+--------+----+
|0001388703| NULL|    5.0|This is a great c...|12 22, 2013|A1ZCPG3D3HGRSS|     mark l. massey|{NULL,  Audio CD,...|    Great worship cd|    1387670400|    true|NULL|
|0001388703| NULL|    5.0|So creative!  Lov...|09 11, 2013| AC2PL52NKPL29|       Norma Mushen|{NULL,  Audio CD,...|Gotta listen to t...|    1378857600|    true|NULL|
|0001388703| NULL|    5.0|Keith Green, gone...| 03 2, 2013|A1SUZXBDZSDQ3A| Herbert W. Shurley|{NULL,  Audio CD,...|Great approach st...|    1362182400|    true|NULL|
|000

**Creating a dummy window partition to use with max and min functions over the entire data**

In [9]:
from pyspark.sql import functions as f
from pyspark.sql import Window

In [10]:
df = df.withColumn("fullWindow", f.lit(1))

In [11]:
w = Window.partitionBy('fullWindow')

**Total number of records**

In [12]:
df.count()

1584082

**Calculating max of rating column over our dummy window and storing into a column 'maxRating'. Then filtering on rows which have rating equal to 'maxRating'. Repeating the same operation for min.**

In [13]:
df_max = df.withColumn('maxRating', f.max('overall').over(w))\
    .where(f.col('overall') == f.col('maxRating'))

In [14]:
df_max.count()

1280147

In [15]:
df_min = df.withColumn('minRating', f.min('overall').over(w))\
    .where(f.col('overall') == f.col('minRating'))

In [16]:
df_min.count()

43108

In [17]:
df_min.show()

+----------+-----+-------+--------------------+-----------+--------------+-------------------+--------------------+--------------------+--------------+--------+----+----------+---------+
|      asin|image|overall|          reviewText| reviewTime|    reviewerID|       reviewerName|               style|             summary|unixReviewTime|verified|vote|fullWindow|minRating|
+----------+-----+-------+--------------------+-----------+--------------+-------------------+--------------------+--------------------+--------------+--------+----+----------+---------+
|0001388703| NULL|    1.0|This tape can har...|05 14, 2009|A3NVGWKHLULDHR|Therese M. Teasdale|{NULL,  Audio Cas...|         Shame Shame|    1242259200|   false|NULL|         1|      1.0|
|0001388703| NULL|    1.0|Buy the CD.  Do n...|05 15, 2015| AZ3T21W6CW0MW|               None|{NULL,  MP3 Music...|Buy the CD.  Do n...|    1431648000|    true|NULL|         1|      1.0|
|B00YDWW4WI| NULL|    1.0|The album is just...| 07 9, 2016|A2TB2T

### Longest review in data

**Using length function on reviewText column followed by max**

In [27]:
df_max_length = df.withColumn('maxLength', f.max(f.length('reviewText')).over(w))\
    .where(f.length('reviewText') == f.col('maxLength'))

In [21]:
df_max_length.show()

+----------+-----+-------+--------------------+-----------+--------------+-------------+--------------------+--------------------+--------------+--------+----+----------+---------+
|      asin|image|overall|          reviewText| reviewTime|    reviewerID| reviewerName|               style|             summary|unixReviewTime|verified|vote|fullWindow|maxLength|
+----------+-----+-------+--------------------+-----------+--------------+-------------+--------------------+--------------------+--------------+--------+----+----------+---------+
|B00FZ11C0G| NULL|    3.0|This is my novel-...|03 26, 2013|A2NAWWR03ZBUTB|Just Some Guy|{NULL,  Audio CD,...|What if Thomas Ke...|    1364256000|    true|  26|         1|    32501|
+----------+-----+-------+--------------------+-----------+--------------+-------------+--------------------+--------------------+--------------+--------+----+----------+---------+



In [22]:
df_max_length.count()

1

In [28]:
df_date = df.select("*",f.to_date(f.col("reviewTime"),"MM d, yyyy").alias("reviewDateFormatted"))

In [24]:
df_date.show()

+----------+-----+-------+--------------------+-----------+--------------+-------------------+--------------------+--------------------+--------------+--------+----+----------+-------------------+
|      asin|image|overall|          reviewText| reviewTime|    reviewerID|       reviewerName|               style|             summary|unixReviewTime|verified|vote|fullWindow|reviewDateFormatted|
+----------+-----+-------+--------------------+-----------+--------------+-------------------+--------------------+--------------------+--------------+--------+----+----------+-------------------+
|0001388703| NULL|    5.0|This is a great c...|12 22, 2013|A1ZCPG3D3HGRSS|     mark l. massey|{NULL,  Audio CD,...|    Great worship cd|    1387670400|    true|NULL|         1|         2013-12-22|
|0001388703| NULL|    5.0|So creative!  Lov...|09 11, 2013| AC2PL52NKPL29|       Norma Mushen|{NULL,  Audio CD,...|Gotta listen to t...|    1378857600|    true|NULL|         1|         2013-09-11|
|0001388703| NU

In [29]:
windowSpec  = Window.partitionBy("fullWindow").orderBy("unixReviewTime")

In [30]:
df_date_lag = df_date.withColumn("lag_time", f.lag("unixReviewTime").over(windowSpec))

In [55]:
df_date_lag.show()

+----------+-----+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------------+--------------+--------+----+----------+-------------------+---------+
|      asin|image|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|               style|             summary|unixReviewTime|verified|vote|fullWindow|reviewDateFormatted| lag_time|
+----------+-----+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------------+--------------+--------+----+----------+-------------------+---------+
|9714721180| NULL|    5.0|Released on Novem...|10 20, 1997|A2SY49Z6H96QVR|   dcollins@s3two.ie|{NULL,  Audio CD,...|Trash metal miles...|     877305600|   false|   5|         1|         1997-10-20|     NULL|
|B0013D89TW| NULL|    5.0|This two CD retro...|11 28, 1997|A2XT7XLQ8DD4NZ|     kblalak@usa.net|{NULL,  Audio CD,...|         Mott Rocks!|     880675200|   false|NULL|  

In [31]:
df_date_lag.write.mode("overwrite").parquet("output_parq")

In [None]:
# Write to SQL Table sample code
sampleDF.write \
  .format("com.microsoft.sqlserver.jdbc.spark") \
  .mode("overwrite") \
  .option("url", "jdbc:sqlserver://{SERVER_ADDR};databaseName=emp;") \
  .option("dbtable", "employee") \
  .option("user", "replace_user_name") \
  .option("password", "replace_password") \
  .save()