In [41]:
from pyspark.find_spark_home import _find_spark_home
from pyspark.sql import SparkSession
from pyspark import SparkConf, StorageLevel
from pyspark.sql.functions import col, split, expr
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
import os

from dotenv import load_dotenv
load_dotenv()

print(_find_spark_home())

C:\Users\anama\anaconda3\envs\Bigdata\Lib\site-packages\pyspark


In [42]:
python_path = os.environ.get('PYTHON_PATH')
app_name_dec = os.environ.get('APP_NAME_DEC')
hadoop_path_dec = os.environ.get('HADOOP_DEC_DATASET_PATH')

In [43]:
conf = SparkConf()\
    .setMaster('local[*]')\
    .set('spark-local-dir', "C:\\spark-temp")\
    .set('spark.driver.memory', '4g')\
    .set('spark.executor.memory', '4g')\
    .set('spark.driver.maxResultSize', '2g')\
    .set('spark.pyspark.python', python_path)\
    .set('spark.pyspark.driver.python', python_path)\
    .set("spark.network.timeout","800s")\
    .set("spark.dynamicAllocation.enabled", "true")\
    .set("spark.shuffle.service.enabled", "true")\
    .set("spark.dynamicAllocation.minExecutors", "1")\
    .set("spark.dynamicAllocation.maxExecutors", "10")\
    .set("spark.dynamicAllocation.executorIdleTimeout", "60s")\

spark = SparkSession.builder.appName(app_name_dec).config(conf=conf).getOrCreate()
sc = spark.sparkContext

for item in sc.getConf().getAll(): print(item)

('spark.dynamicAllocation.minExecutors', '1')
('spark.app.startTime', '1702906231819')
('spark.driver.memory', '4g')
('spark.dynamicAllocation.maxExecutors', '10')
('spark.app.id', 'local-1702906233502')
('spark.driver.extraJavaOptions', '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAME

In [44]:
df = spark.read.csv(hadoop_path_dec, header = True, inferSchema = True)
df.show()

+-------------------+----------+----------+-------------------+--------------------+-------+-------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code|  brand|  price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+-------+-------+---------+--------------------+
|2019-12-01 01:00:00|      view|   1005105|2232732093077520756|construction.tool...|  apple|1302.48|556695836|ca5eefc5-11f9-450...|
|2019-12-01 01:00:00|      view|  22700068|2232732091643068746|                null|  force| 102.96|577702456|de33debe-c7bf-44e...|
|2019-12-01 01:00:01|      view|   2402273|2232732100769874463|appliances.person...|  bosch| 313.52|539453785|5ee185a7-0689-4a3...|
|2019-12-01 01:00:02|  purchase|  26400248|2053013553056579841|computers.periphe...|   null| 132.31|535135317|61792a26-672f-4e6...|
|2019-12-01 01:00:02|      view|  20100164|2232732110089618156|    apparel.t

In [45]:
df_index = df.select('*').withColumn('id', F.monotonically_increasing_id())
column_names = ['id'] + [col for col in df.columns]

df_index_first = df_index.select(column_names)
df_index_first.show()

+---+-------------------+----------+----------+-------------------+--------------------+-------+-------+---------+--------------------+
| id|         event_time|event_type|product_id|        category_id|       category_code|  brand|  price|  user_id|        user_session|
+---+-------------------+----------+----------+-------------------+--------------------+-------+-------+---------+--------------------+
|  0|2019-12-01 01:00:00|      view|   1005105|2232732093077520756|construction.tool...|  apple|1302.48|556695836|ca5eefc5-11f9-450...|
|  1|2019-12-01 01:00:00|      view|  22700068|2232732091643068746|                null|  force| 102.96|577702456|de33debe-c7bf-44e...|
|  2|2019-12-01 01:00:01|      view|   2402273|2232732100769874463|appliances.person...|  bosch| 313.52|539453785|5ee185a7-0689-4a3...|
|  3|2019-12-01 01:00:02|  purchase|  26400248|2053013553056579841|computers.periphe...|   null| 132.31|535135317|61792a26-672f-4e6...|
|  4|2019-12-01 01:00:02|      view|  20100164|2

In [46]:
df_new = df_index_first.drop('user_session', 'product_id','category_id')
df_new.show()

+---+-------------------+----------+--------------------+-------+-------+---------+
| id|         event_time|event_type|       category_code|  brand|  price|  user_id|
+---+-------------------+----------+--------------------+-------+-------+---------+
|  0|2019-12-01 01:00:00|      view|construction.tool...|  apple|1302.48|556695836|
|  1|2019-12-01 01:00:00|      view|                null|  force| 102.96|577702456|
|  2|2019-12-01 01:00:01|      view|appliances.person...|  bosch| 313.52|539453785|
|  3|2019-12-01 01:00:02|  purchase|computers.periphe...|   null| 132.31|535135317|
|  4|2019-12-01 01:00:02|      view|    apparel.trousers|   nika| 101.68|517987650|
|  5|2019-12-01 01:00:02|      view|accessories.umbrella|   ikea| 163.56|542860793|
|  6|2019-12-01 01:00:02|      view|  electronics.clocks|   null|  88.81|538021416|
|  7|2019-12-01 01:00:03|      view|construction.tool...| xiaomi| 256.38|525740700|
|  8|2019-12-01 01:00:04|      view|  computers.notebook|    jet|  20.57|512

In [47]:
df_new = df_new.dropna()
df_new.show()

+---+-------------------+----------+--------------------+---------+-------+---------+
| id|         event_time|event_type|       category_code|    brand|  price|  user_id|
+---+-------------------+----------+--------------------+---------+-------+---------+
|  0|2019-12-01 01:00:00|      view|construction.tool...|    apple|1302.48|556695836|
|  2|2019-12-01 01:00:01|      view|appliances.person...|    bosch| 313.52|539453785|
|  4|2019-12-01 01:00:02|      view|    apparel.trousers|     nika| 101.68|517987650|
|  5|2019-12-01 01:00:02|      view|accessories.umbrella|     ikea| 163.56|542860793|
|  7|2019-12-01 01:00:03|      view|construction.tool...|   xiaomi| 256.38|525740700|
|  8|2019-12-01 01:00:04|      view|  computers.notebook|      jet|  20.57|512509221|
| 10|2019-12-01 01:00:04|      view|  computers.notebook|    vegas|  49.94|554369617|
| 11|2019-12-01 01:00:04|      view|construction.tool...|    apple|1312.52|579969851|
| 13|2019-12-01 01:00:06|      view|construction.tool.

In [48]:

df_new = df_new.withColumn("split_code", split(col("category_code"), "\."))

df_new = df_new.withColumn("category", col("split_code")[0])

df_new = df_new.withColumn("product", expr("substring(category_code, length(category) + 2)"))

df_new =df_new.drop("split_code")

df_new.show(truncate=False)


+---+-------------------+----------+---------------------------------+---------+-------+---------+------------+-----------------------+
|id |event_time         |event_type|category_code                    |brand    |price  |user_id  |category    |product                |
+---+-------------------+----------+---------------------------------+---------+-------+---------+------------+-----------------------+
|0  |2019-12-01 01:00:00|view      |construction.tools.light         |apple    |1302.48|556695836|construction|tools.light            |
|2  |2019-12-01 01:00:01|view      |appliances.personal.massager     |bosch    |313.52 |539453785|appliances  |personal.massager      |
|4  |2019-12-01 01:00:02|view      |apparel.trousers                 |nika     |101.68 |517987650|apparel     |trousers               |
|5  |2019-12-01 01:00:02|view      |accessories.umbrella             |ikea     |163.56 |542860793|accessories |umbrella               |
|7  |2019-12-01 01:00:03|view      |construction

In [49]:
df_new =df_new.drop("category_code")
df_new.show(truncate=False)

+---+-------------------+----------+---------+-------+---------+------------+-----------------------+
|id |event_time         |event_type|brand    |price  |user_id  |category    |product                |
+---+-------------------+----------+---------+-------+---------+------------+-----------------------+
|0  |2019-12-01 01:00:00|view      |apple    |1302.48|556695836|construction|tools.light            |
|2  |2019-12-01 01:00:01|view      |bosch    |313.52 |539453785|appliances  |personal.massager      |
|4  |2019-12-01 01:00:02|view      |nika     |101.68 |517987650|apparel     |trousers               |
|5  |2019-12-01 01:00:02|view      |ikea     |163.56 |542860793|accessories |umbrella               |
|7  |2019-12-01 01:00:03|view      |xiaomi   |256.38 |525740700|construction|tools.light            |
|8  |2019-12-01 01:00:04|view      |jet      |20.57  |512509221|computers   |notebook               |
|10 |2019-12-01 01:00:04|view      |vegas    |49.94  |554369617|computers   |noteb

In [50]:

df_new = df_new.withColumn("date", split(col("event_time"), " ")[0])
df_new = df_new.withColumn("day", split(col("date"), "-")[2].cast(IntegerType()))
df_new = df_new.drop("event_time")


df_new.show()


+---+----------+---------+-------+---------+------------+--------------------+----------+---+
| id|event_type|    brand|  price|  user_id|    category|             product|      date|day|
+---+----------+---------+-------+---------+------------+--------------------+----------+---+
|  0|      view|    apple|1302.48|556695836|construction|         tools.light|2019-12-01|  1|
|  2|      view|    bosch| 313.52|539453785|  appliances|   personal.massager|2019-12-01|  1|
|  4|      view|     nika| 101.68|517987650|     apparel|            trousers|2019-12-01|  1|
|  5|      view|     ikea| 163.56|542860793| accessories|            umbrella|2019-12-01|  1|
|  7|      view|   xiaomi| 256.38|525740700|construction|         tools.light|2019-12-01|  1|
|  8|      view|      jet|  20.57|512509221|   computers|            notebook|2019-12-01|  1|
| 10|      view|    vegas|  49.94|554369617|   computers|            notebook|2019-12-01|  1|
| 11|      view|    apple|1312.52|579969851|construction|   

In [51]:
df_new = df_new.drop("date")
df_new.show()

+---+----------+---------+-------+---------+------------+--------------------+---+
| id|event_type|    brand|  price|  user_id|    category|             product|day|
+---+----------+---------+-------+---------+------------+--------------------+---+
|  0|      view|    apple|1302.48|556695836|construction|         tools.light|  1|
|  2|      view|    bosch| 313.52|539453785|  appliances|   personal.massager|  1|
|  4|      view|     nika| 101.68|517987650|     apparel|            trousers|  1|
|  5|      view|     ikea| 163.56|542860793| accessories|            umbrella|  1|
|  7|      view|   xiaomi| 256.38|525740700|construction|         tools.light|  1|
|  8|      view|      jet|  20.57|512509221|   computers|            notebook|  1|
| 10|      view|    vegas|  49.94|554369617|   computers|            notebook|  1|
| 11|      view|    apple|1312.52|579969851|construction|         tools.light|  1|
| 13|      view|  samsung| 124.11|532554953|construction|         tools.light|  1|
| 14

In [52]:
print("Null values present in:")
for c in ["category","product","day", "brand"]:
    print(c +':', df_new.where(F.col(c).isNull()).count())

Null values present in:
category: 0
product: 0
day: 0
brand: 0


In [53]:
count = df_new.count()
print(f"Total number of rows: {count:,d}")

Total number of rows: 53,612,307


In [54]:
save_path = hadoop_path_dec + '/selected_Data/' 

df_new.repartition(8).write.mode('overwrite').option("header", "true").csv(save_path)
spark.stop()