In [2]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Projet Spark") \
    .config("spark.ui.port", "0") \
    .master("local[*]") \
    .getOrCreate()


23/05/26 21:48:13 WARN Utils: Your hostname, macs-MacBook-Pro-a2s.local resolves to a loopback address: 127.0.0.1; using 192.168.43.57 instead (on interface en0)
23/05/26 21:48:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/26 21:48:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
eventsPath = "/Users/mac/big-data/events.json"


In [4]:

eventsDF = spark.read.json(eventsPath)

# Display the DataFrame
eventsDF.show()

                                                                                

+-------+------------------+----------+------------------------+----------------+--------------------+--------------------+--------------+--------------------------+-----------------+
| device|         ecommerce|event_name|event_previous_timestamp| event_timestamp|                 geo|               items|traffic_source|user_first_touch_timestamp|          user_id|
+-------+------------------+----------+------------------------+----------------+--------------------+--------------------+--------------+--------------------------+-----------------+
|  macOS|{null, null, null}|  warranty|        1593878899217692|1593878946592107|      {Montrose, MI}|                  []|        google|          1593878899217692|UA000000107379500|
|Windows|{null, null, null}|     press|        1593876662175340|1593877011756535|   {Northampton, MA}|                  []|        google|          1593876662175340|UA000000107359357|
|  macOS|{null, null, null}|  add_item|        1593878792892652|1593878815459100

In [10]:
from pyspark.sql.functions import col, explode

# Extract relevant columns and transform the data
revenueDF = eventsDF.select(
    col("device"),
    col("ecommerce"),
    col("event_name"),
    col("event_previous_timestamp"),
    col("event_timestamp"),
    col("geo"),
    explode(col("items")).alias("item"),
    col("traffic_source"),
    col("user_first_touch_timestamp"),
    col("user_id")
).select(
    col("device"),
    col("ecommerce"),
    col("event_name"),
    col("event_previous_timestamp"),
    col("event_timestamp"),
    col("geo"),
    col("item.item_id").alias("item_id"),
    col("item.item_name").alias("item_name"),
    col("item.item_revenue_in_usd").alias("price"),
    col("item.quantity").alias("quantity"),
    col("traffic_source"),
    col("user_first_touch_timestamp"),
    col("user_id")
)

# Show the resulting DataFrame
revenueDF.show()


+---------+------------------+----------+------------------------+----------------+------------------+--------+--------------------+------+--------+--------------+--------------------------+-----------------+
|   device|         ecommerce|event_name|event_previous_timestamp| event_timestamp|               geo| item_id|           item_name| price|quantity|traffic_source|user_first_touch_timestamp|          user_id|
+---------+------------------+----------+------------------------+----------------+------------------+--------+--------------------+------+--------+--------------+--------------------------+-----------------+
|    macOS|{null, null, null}|  add_item|        1593878792892652|1593878815459100|     {Salinas, CA}|M_STAN_T|Standard Twin Mat...| 595.0|       1|       youtube|          1593878455472030|UA000000107375547|
|  Android|{null, null, null}|      cart|        1593878887634182|1593878899159806|{Fayetteville, AR}|M_STAN_F|Standard Full Mat...| 945.0|       1|        google| 

In [12]:
from pyspark.sql.functions import col

# Convert the data type of the 'price' column from string to float
purchasesDF = revenueDF.withColumn('price', col('price').cast('float'))

# Show the resulting DataFrame
purchasesDF.show()


+---------+------------------+----------+------------------------+----------------+------------------+--------+--------------------+------+--------+--------------+--------------------------+-----------------+
|   device|         ecommerce|event_name|event_previous_timestamp| event_timestamp|               geo| item_id|           item_name| price|quantity|traffic_source|user_first_touch_timestamp|          user_id|
+---------+------------------+----------+------------------------+----------------+------------------+--------+--------------------+------+--------+--------------+--------------------------+-----------------+
|    macOS|{null, null, null}|  add_item|        1593878792892652|1593878815459100|     {Salinas, CA}|M_STAN_T|Standard Twin Mat...| 595.0|       1|       youtube|          1593878455472030|UA000000107375547|
|  Android|{null, null, null}|      cart|        1593878887634182|1593878899159806|{Fayetteville, AR}|M_STAN_F|Standard Full Mat...| 945.0|       1|        google| 

In [16]:
from pyspark.sql.functions import col

distinctDF = purchasesDF.dropDuplicates(['device', 'ecommerce', 'event_name', 'event_previous_timestamp', 'event_timestamp', 'geo', 'item_id', 'item_name', 'price', 'quantity', 'traffic_source', 'user_first_touch_timestamp', 'user_id'])

distinctDF.show()


[Stage 4:>                                                          (0 + 1) / 1]

+---------+------------------+-------------+------------------------+----------------+--------------------+--------+--------------------+------+--------+--------------+--------------------------+-----------------+
|   device|         ecommerce|   event_name|event_previous_timestamp| event_timestamp|                 geo| item_id|           item_name| price|quantity|traffic_source|user_first_touch_timestamp|          user_id|
+---------+------------------+-------------+------------------------+----------------+--------------------+--------+--------------------+------+--------+--------------+--------------------------+-----------------+
|    macOS|{null, null, null}|     register|        1593877536537378|1593877547247035|     {Salisbury, MD}|M_STAN_T|Standard Twin Mat...| 595.0|       1|        direct|          1593877272789441|UA000000107364830|
|    macOS|{null, null, null}|     add_item|        1593876710692540|1593878204108355|       {Edmonds, WA}|M_STAN_F|Standard Full Mat...| 945.0|

                                                                                

In [17]:
from pyspark.sql.functions import col

cleanDF = purchasesDF.filter(col("device").isNotNull() & col("ecommerce").isNotNull() & col("event_name").isNotNull() & col("event_previous_timestamp").isNotNull() & col("event_timestamp").isNotNull() & col("geo").isNotNull() & col("item_id").isNotNull() & col("item_name").isNotNull() & col("price").isNotNull() & col("quantity").isNotNull() & col("traffic_source").isNotNull() & col("user_first_touch_timestamp").isNotNull() & col("user_id").isNotNull())

cleanDF.show()


+---------+------------------+----------+------------------------+----------------+------------------+--------+--------------------+------+--------+--------------+--------------------------+-----------------+
|   device|         ecommerce|event_name|event_previous_timestamp| event_timestamp|               geo| item_id|           item_name| price|quantity|traffic_source|user_first_touch_timestamp|          user_id|
+---------+------------------+----------+------------------------+----------------+------------------+--------+--------------------+------+--------+--------------+--------------------------+-----------------+
|    macOS|{null, null, null}|  add_item|        1593878792892652|1593878815459100|     {Salinas, CA}|M_STAN_T|Standard Twin Mat...| 595.0|       1|       youtube|          1593878455472030|UA000000107375547|
|  Android|{null, null, null}|      cart|        1593878887634182|1593878899159806|{Fayetteville, AR}|M_STAN_F|Standard Full Mat...| 945.0|       1|        google| 

In [19]:
from pyspark.sql.functions import col, sum, avg

trafficDF = cleanDF.groupBy("device", "ecommerce", "event_name", "event_previous_timestamp",
                            "event_timestamp", "geo", "item_id", "item_name", "traffic_source",
                            "user_first_touch_timestamp", "user_id"
                           ).agg(sum(col("price") * col("quantity")).alias("total_rev"),
                                  avg(col("price") * col("quantity")).alias("avg_rev")
                                 )

trafficDF.show()


[Stage 8:>                                                          (0 + 1) / 1]

+-------+------------------+-------------+------------------------+----------------+--------------------+--------+--------------------+--------------+--------------------------+-----------------+---------+-------+
| device|         ecommerce|   event_name|event_previous_timestamp| event_timestamp|                 geo| item_id|           item_name|traffic_source|user_first_touch_timestamp|          user_id|total_rev|avg_rev|
+-------+------------------+-------------+------------------------+----------------+--------------------+--------+--------------------+--------------+--------------------------+-----------------+---------+-------+
|  macOS|{null, null, null}|     checkout|        1593877025280682|1593878219791606|  {Eden Prairie, MN}|M_STAN_T|Standard Twin Mat...|     instagram|          1593876442539386|UA000000107357470|   2380.0| 2380.0|
|Windows|{null, null, null}|      cc_info|        1593799634298217|1593799703513611|      {McKinney, TX}|M_STAN_Q|Standard Queen Ma...|         

                                                                                

In [20]:
from pyspark.sql.functions import desc

topTrafficDF = trafficDF.orderBy(desc("total_rev")).limit(5)

topTrafficDF.show()


+---------+------------------+-------------+------------------------+----------------+-------------------+--------+--------------------+--------------+--------------------------+-----------------+---------+-------+
|   device|         ecommerce|   event_name|event_previous_timestamp| event_timestamp|                geo| item_id|           item_name|traffic_source|user_first_touch_timestamp|          user_id|total_rev|avg_rev|
+---------+------------------+-------------+------------------------+----------------+-------------------+--------+--------------------+--------------+--------------------------+-----------------+---------+-------+
|  Windows|{null, null, null}|     add_item|        1593684004933192|1593684007001066|{National City, CA}|M_PREM_F|Premium Full Matt...|         email|          1593438651248551|UA000000106029951|   6102.0| 6102.0|
|      iOS|{null, null, null}|     checkout|        1593392493331752|1593392616382708|  {San Leandro, CA}|M_STAN_K|Standard King Mat...|    

In [21]:
from pyspark.sql.functions import round

finalDF = topTrafficDF.withColumn("total_rev", round("total_rev", 2))
finalDF = finalDF.withColumn("avg_rev", round("avg_rev", 2))

finalDF.show()


[Stage 14:>                                                         (0 + 1) / 1]

+---------+------------------+-------------+------------------------+----------------+-------------------+--------+--------------------+--------------+--------------------------+-----------------+---------+-------+
|   device|         ecommerce|   event_name|event_previous_timestamp| event_timestamp|                geo| item_id|           item_name|traffic_source|user_first_touch_timestamp|          user_id|total_rev|avg_rev|
+---------+------------------+-------------+------------------------+----------------+-------------------+--------+--------------------+--------------+--------------------------+-----------------+---------+-------+
|  Windows|{null, null, null}|     add_item|        1593684004933192|1593684007001066|{National City, CA}|M_PREM_F|Premium Full Matt...|         email|          1593438651248551|UA000000106029951|   6102.0| 6102.0|
|      iOS|{null, null, null}|     checkout|        1593392493331752|1593392616382708|  {San Leandro, CA}|M_STAN_K|Standard King Mat...|    

                                                                                

In [23]:

finalDF.write.mode("overwrite").parquet("/Users/mac/big-data/final/")


                                                                                

In [25]:
git init.

SyntaxError: invalid syntax (2835759911.py, line 1)