## Pyspark script for data transformation

In [1]:
from pyspark.sql import SparkSession

# Initialize Spark session with GCS configurations
spark = SparkSession.builder \
    .appName("AccessGCSData") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.fs.gs.auth.service.account.enable", "true") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/19 10:56:37 INFO SparkEnv: Registering MapOutputTracker
24/11/19 10:56:37 INFO SparkEnv: Registering BlockManagerMaster
24/11/19 10:56:37 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/11/19 10:56:37 INFO SparkEnv: Registering OutputCommitCoordinator


In [2]:
# Print Spark session to verify
print(spark.version)

3.5.1


In [3]:
gcs_path = 'gs://amazon-data-analysis/amazon.csv'

In [27]:
df = spark.read.csv(gcs_path, header = True, inferSchema = True)

                                                                                

In [28]:
df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- discounted_price: string (nullable = true)
 |-- actual_price: string (nullable = true)
 |-- discount_percentage: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- rating_count: string (nullable = true)
 |-- about_product: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_name: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- review_title: string (nullable = true)
 |-- review_content: string (nullable = true)
 |-- img_link: string (nullable = true)
 |-- product_link: string (nullable = true)



In [29]:
from pyspark.sql import functions as F

# Get the count of null values for each column in the DataFrame
null_counts = df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns])

# Show the null counts
null_counts.show()




+----------+------------+--------+-------+----------------+------------+-------------------+-------+------------+-------------+-------+---------+---------+------------+--------------+--------+------------+
|product_id|product_name|category|    _c3|discounted_price|actual_price|discount_percentage| rating|rating_count|about_product|user_id|user_name|review_id|review_title|review_content|img_link|product_link|
+----------+------------+--------+-------+----------------+------------+-------------------+-------+------------+-------------+-------+---------+---------+------------+--------------+--------+------------+
|         0|     2585908| 2585908|2587316|         2585911|     2585912|            2585924|2585917|     2585923|      2585917|2585911|  2585908|  2585908|     2585908|       2585908| 2585908|     2585908|
+----------+------------+--------+-------+----------------+------------+-------------------+-------+------------+-------------+-------+---------+---------+------------+--------

                                                                                

In [30]:
# Drop the column 'Unnamed: 3' from the DataFrame
df = df.drop('_c3')

# Show the resulting DataFrame
#df.show()

In [31]:
df = df.dropna()

In [32]:
df = df.dropDuplicates()

In [33]:
from pyspark.sql.functions import split, col

# Split the 'product_name' column by the specified special characters or words (comma, &, ., or, and, (, {)
df = df.withColumn('product_name', 
                  split(col('product_name'), r"[,\.&\(\{|]|(?=\s*(?:or|and))").getItem(0))


In [34]:
# Get the unique values from the 'product_name' column
df.select('product_name').distinct().show(1)




+--------------------+
|        product_name|
+--------------------+
|LOHAYA Voice Assi...|
+--------------------+
only showing top 1 row



                                                                                

In [35]:
# Split the 'category' column by the '|' character and extract the first part
df = df.withColumn('category', 
                  split(col('category'), r'\|').getItem(0))

In [36]:
from pyspark.sql.functions import regexp_extract

# Extract the numeric value (integer or float) from the 'discounted_price' column
df = df.withColumn('discounted_price', 
                  regexp_extract(col('discounted_price'), r'(\d+(\.\d+)?)', 0).cast('float'))

In [37]:
# Extract the numeric value (integer or float) from the 'actual_price' column
df = df.withColumn('actual_price', 
                  regexp_extract(col('actual_price'), r'(\d+(\.\d+)?)', 0).cast('float'))

In [38]:
# Extract the numeric value (integer or float) from the 'discount_percentage' column
df = df.withColumn('discount_percentage', 
                  regexp_extract(col('discount_percentage'), r'(\d+(\.\d+)?)', 0).cast('float'))

In [43]:
# Remove rows where 'rating' column has the invalid value '|'
df = df.filter(col('rating') != '|')

In [45]:
df = df.withColumn('rating', col('rating').cast('float'))

In [46]:
from pyspark.sql.functions import regexp_replace

# Remove commas from 'rating_count' column and preserve non-empty valid values
df = df.withColumn('rating_count', 
                  regexp_replace(col('rating_count'), ',', ''))

# Convert the 'rating_count' column to integer
df = df.withColumn('rating_count', col('rating_count').cast('int'))

## Transformed data

In [47]:
# Drop columns from the DataFrame
df = df.drop('user_id', 'user_name', 'review_id', 'review_title', 'review_content', 
             'img_link', 'product_link', 'about_product')

In [56]:
from pyspark.sql.window import Window

# Generate sequential 'id' using row_number()

window_spec = Window.orderBy(F.lit(1))  # Window specification for sequential order

df = df.withColumn('id', F.row_number().over(window_spec))

In [57]:
df = df.select('id','product_id','product_name','category','discounted_price','actual_price','discount_percentage','rating','rating_count')

In [58]:
df.printSchema()

root
 |-- id: integer (nullable = false)
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- discounted_price: float (nullable = true)
 |-- actual_price: float (nullable = true)
 |-- discount_percentage: float (nullable = true)
 |-- rating: float (nullable = true)
 |-- rating_count: integer (nullable = true)



## Data Modelling: fact_table & dimention table

In [61]:
# Select specific columns from the DataFrame as product_dim dimention table

product_dim = df.select('id', 'product_id', 'product_name', 'category')

In [64]:
# Drop the specified columns and add 'id' column

fact_table = df.drop('product_id', 'product_name', 'category')

In [65]:
fact_table.show()
product_dim.show()

24/11/19 11:25:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/19 11:25:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/19 11:25:22 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/19 11:25:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/19 11:25:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/19 11:25:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/19 1

+---+----------------+------------+-------------------+------+------------+
| id|discounted_price|actual_price|discount_percentage|rating|rating_count|
+---+----------------+------------+-------------------+------+------------+
|  1|           199.0|       999.0|               80.0|   3.1|           2|
|  2|           379.0|       999.0|               62.0|   4.3|        3096|
|  3|           299.0|       799.0|               63.0|   4.2|        2117|
|  4|           599.0|         1.0|               57.0|   4.1|       14560|
|  5|           999.0|         1.0|               38.0|   4.3|       12093|
|  6|           539.0|       720.0|               25.0|   4.1|       36017|
|  7|           559.0|         1.0|               45.0|   4.1|       17325|
|  8|             1.0|         4.0|               71.0|   3.6|          63|
|  9|           699.0|         1.0|               59.0|   4.1|        3524|
| 10|           599.0|         1.0|               60.0|   4.1|      161679|
| 11|       

24/11/19 11:25:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/19 11:25:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/19 11:25:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/19 11:25:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+---+----------+--------------------+--------------------+
| id|product_id|        product_name|            category|
+---+----------+--------------------+--------------------+
|  1|B0B3JSWG81|              NGI St|        Home&Kitchen|
|  2|B07GMFY9QM|SOFLIN Egg Boiler...|        Home&Kitchen|
|  3|B07924P3C5|                  St|Computers&Accesso...|
|  4|B07QCWY5XV|Mobilife Bluetoot...|         Electronics|
|  5|B07KRCW6LZ|TP-Link Nano AC60...|Computers&Accesso...|
|  6|B00ABMASXG|Bajaj Immersion R...|        Home&Kitchen|
|  7|B01N6IJG0F|                   M|        Home&Kitchen|
|  8|B08T8KWNQ9|TE™ Instant Elect...|        Home&Kitchen|
|  9|B01CS4A5V4|               Monit|        Home&Kitchen|
| 10|B07S9S86BF|boAt Bassheads 24...|         Electronics|
| 11|B0BPBXNQQT|Room Heater Warme...|        Home&Kitchen|
| 12|B09NKZXMWJ|               Flix |Computers&Accesso...|
| 13|B081FG1QYX|Wayona Type C Cab...|Computers&Accesso...|
| 14|B01DEWVZ2C|JBL C100SI Wired ...|         Electronic

                                                                                

# Exporting to BigQuery

In [66]:
# Export DataFrame to BigQuery
fact_table.write \
    .format("bigquery") \
    .option("temporaryGcsBucket", "gs://amazon-data-analysis/temp") \
    .option("table", "uber-data-analysis-442008.amazon_data.fact_table") \
    .save()

product_dim.write \
    .format("bigquery") \
    .option("temporaryGcsBucket", "gs://amazon-data-analysis/temp") \
    .option("table", "uber-data-analysis-442008.amazon_data.product_dim") \
    .save()

24/11/19 11:26:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/19 11:26:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/19 11:26:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/19 11:26:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/19 11:26:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/19 11:26:21 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/19 1