In [None]:
sudo -i
apt-get update
apt-get install wget
wget -P $SPARK_HOME/jars https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar 

In [1]:
import pyspark.sql.functions as f

from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.window import Window
import findspark

spark = SparkSession \
    .builder \
    .appName("PySpark") \
    .getOrCreate()
spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile","gsc_key.txt")

sc = spark.sparkContext
sql_c = SQLContext(spark.sparkContext)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/11 08:20:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
#date format YYYY-MM-DD 
DATE_START="2019-10-01" 
DATE_END="2019-10-03" 
#Path et fileName destination
DESTINATION="gs://projets_blent/ecom/out/"

SOURCE="gs://projets_blent/ecom/blent-learning-user-ressources.s3.eu-west-3.amazonaws.com/projects/9c15cb"

24/01/11 08:21:02 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [3]:
#test avec un fichier sample  
import os
import urllib.request
from subprocess import run

# URL du fichier à télécharger
url = "https://blent-learning-user-ressources.s3.eu-west-3.amazonaws.com/projects/9c15cb/sample.csv"

# Nom du fichier de destination
destination_file = os.path.join( "sample.csv")

try:
    # Télécharger le fichier seulement s'il n'existe pas déjà
    if not os.path.exists(destination_file):
        urllib.request.urlretrieve(url, destination_file)
        print(f"Le fichier {destination_file} a été téléchargé avec succès.")
    else:
        print(f"Le fichier {destination_file} existe déjà.")
except Exception as e:
    print(f"Une erreur s'est produite : {str(e)}")

data = spark.read.csv("sample.csv",
                      header=True)
data.show(10,truncate=False)

Le fichier sample.csv a été téléchargé avec succès.
+-------------------+----------+----------+-------------------+-----------------------------------+--------+-------+---------+------------------------------------+
|event_time         |event_type|product_id|category_id        |category_code                      |brand   |price  |user_id  |user_session                        |
+-------------------+----------+----------+-------------------+-----------------------------------+--------+-------+---------+------------------------------------+
|2019-10-01 00:00:00|view      |44600062  |2103807459595387724|null                               |shiseido|35.79  |541312140|72d76fde-8bb3-4e00-8c23-a032dfed738c|
|2019-10-01 00:00:00|view      |3900821   |2053013552326770905|appliances.environment.water_heater|aqua    |33.2   |554748717|9333dfbd-b87a-4708-9857-6336556b0fcc|
|2019-10-01 00:00:01|view      |17200506  |2053013559792632471|furniture.living_room.sofa         |null    |543.1  |519107250|56

In [27]:
# Chargements des fichiers csv du bucket
data = spark.read.csv(SOURCE+"/2019-Oct.csv",
                      header=True)

data.show(10,truncate=False)

+-----------------------+----------+----------+-------------------+-----------------------------------+--------+-------+---------+------------------------------------+
|event_time             |event_type|product_id|category_id        |category_code                      |brand   |price  |user_id  |user_session                        |
+-----------------------+----------+----------+-------------------+-----------------------------------+--------+-------+---------+------------------------------------+
|2019-10-01 00:00:00 UTC|view      |44600062  |2103807459595387724|null                               |shiseido|35.79  |541312140|72d76fde-8bb3-4e00-8c23-a032dfed738c|
|2019-10-01 00:00:00 UTC|view      |3900821   |2053013552326770905|appliances.environment.water_heater|aqua    |33.20  |554748717|9333dfbd-b87a-4708-9857-6336556b0fcc|
|2019-10-01 00:00:01 UTC|view      |17200506  |2053013559792632471|furniture.living_room.sofa         |null    |543.10 |519107250|566511c2-e2e3-422b-b695-cf8e6e

In [4]:
#conversion des colonnes et filtre sur les dates spécifiés en paramètres

data = data.withColumn("event_time", f.col("event_time").cast("timestamp"))\
           .filter( ( f.col("event_time")>= DATE_START) &  ( f.col("event_time") <= DATE_END ) )\
           .withColumn("product_id", f.col("product_id").cast("int"))\
           .withColumn("category_id", f.col("product_id").cast("int"))\
           .withColumn("price", f.col("price").cast("float"))\
           .withColumn("user_id", f.col("user_id").cast("int"))

data.printSchema()
print('Filtre du '+DATE_START+ " au " + DATE_END)
print( str(data.count()) + ' lignes')
#1244245 

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: float (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)

Filtre du 2019-10-01 au 2019-10-03


[Stage 2:>                                                          (0 + 3) / 3]

1244245 lignes


                                                                                

In [5]:
#granularité cible : user_session/product_id
#avec calcul colonne purcharsed/num_view_product
data_cible = data.withColumn(
                        'nb_purchased',
                        f.when(f.col('event_type') == 'purchase', 1).otherwise(0)
                  ) \
                 .withColumn(
                        'nb_views',
                        f.when(f.col('event_type') == 'view', 1).otherwise(0)
                    ) \
                 .groupBy("product_id","user_session","user_id","category_id","category_code","brand","price") \
                 .agg(
                    f.sum('nb_purchased').alias('nb_purchased'),
                    f.sum('nb_views').alias('num_views_product')
                 )\
                 .withColumn(
                        'purchased',
                         f.when(f.col('nb_purchased') > 0, f.lit(True)).otherwise(f.lit(False))
                  )

print( str(data.count()) + ' lignes dans data_cible')
data_cible.show(5)


                                                                                

1244245 lignes dans data_cible


[Stage 8:>                                                          (0 + 3) / 3]

+----------+--------------------+---------+-----------+--------------------+-------+------+------------+-----------------+---------+
|product_id|        user_session|  user_id|category_id|       category_code|  brand| price|nb_purchased|num_views_product|purchased|
+----------+--------------------+---------+-----------+--------------------+-------+------+------------+-----------------+---------+
|   1307067|7c90fc70-0e80-459...|550050854|    1307067|  computers.notebook| lenovo|251.74|           0|                2|    false|
|  18500019|37b4834e-2519-7fc...|516285392|   18500019|  electronics.tablet|  wacom|108.63|           0|                1|    false|
|   1005115|44fa61e3-efed-48d...|512769168|    1005115|electronics.smart...|  apple|975.57|           0|                1|    false|
|  28711902|ad4beff7-d2e4-421...|547469497|   28711902|apparel.shoes.sli...|bugatti| 47.62|           0|                5|    false|
|   1005160|b1404e42-94e4-4ef...|539456226|    1005160|electronics.sm

                                                                                

In [21]:
#granularité  : user_session
#avec calcul colonnes 
data_session = data.withColumn(
                        'nb_views',
                        f.when(f.col('event_type') == 'view', 1).otherwise(0)
                    ) \
                 .groupBy("user_session","user_id") \
                 .agg(
                    f.min('event_time').alias('date_session_start'),
                    f.max('event_time').alias('date_session_end'),
                    f.sum('nb_views').alias('num_views_session')
                 )\
                 .withColumn( "hour_session_start", f.hour("date_session_start") ) \
                 .withColumn( "min_session_start", f.minute("date_session_start") ) \
                 .withColumn( "day_session_start", f.dayofweek("date_session_start") ) \
                 .withColumn( "duration",  (f.col("date_session_end") - f.col("date_session_start")))\
                 .withColumn("num_prev_sessions", f.row_number().over(Window.partitionBy("user_id").orderBy("date_session_start")))

print( str(data.count()) + ' lignes dans data_session')
data_session.show(5)

                                                                                

1244245 lignes dans data_session




+--------------------+---------+-------------------+-------------------+-----------------+------------------+-----------------+-----------------+--------------------+-----------------+
|        user_session|  user_id| date_session_start|   date_session_end|num_views_session|hour_session_start|min_session_start|day_session_start|            duration|num_prev_sessions|
+--------------------+---------+-------------------+-------------------+-----------------+------------------+-----------------+-----------------+--------------------+-----------------+
|91769fdf-461b-4e4...|244951053|2019-10-01 08:47:35|2019-10-01 08:48:28|                2|                 8|               47|                3|INTERVAL '0 00:00...|                1|
|0051531b-c007-442...|292071852|2019-10-01 17:06:51|2019-10-01 17:06:51|                1|                17|                6|                3|INTERVAL '0 00:00...|                1|
|d146126f-ce44-48d...|293291933|2019-10-01 18:58:32|2019-10-01 19:03:11|   

                                                                                

In [17]:
#Jointure des tables précédements calculés
final_data = data_cible.alias("d").join(data_session.alias("session"),
                            ( ( data_cible.user_session ==  data_session.user_session) & ( data_cible.user_id ==  data_session.user_id) )
                            ,"outer")\
                       .select("product_id","d.user_session","d.user_id" ,"category_id","category_code","brand","price","purchased"\
                               ,"num_views_product","num_views_session","hour_session_start","min_session_start","day_session_start","date_session_start"\
                               ,"duration","num_prev_sessions")\
                       .withColumn("num_prev_product_views", f.sum("d.num_views_product").over(Window().partitionBy("user_id","product_id").orderBy("session.date_session_start").rowsBetween(Window.unboundedPreceding, Window.currentRow) ))


#check calcul num_prev_product_views
#final_data.filter(f.col("user_id") == "518626190" ).show(100,truncate=False)
#final_data.groupBy("d.user_id","d.product_id").count().alias("nb").filter( f.col("nb.count").cast("int") > 1 ).show()
final_data.show(5,truncate=False)
final_data.count()
print( str(data.count()) + ' lignes dans final_data')



                                                                                

+----------+------------------------------------+---------+-----------+----------------------+--------+-----+---------+-----------------+-----------------+------------------+-----------------+-----------------+-------------------+-----------------------------------+-----------------+----------------------+
|product_id|user_session                        |user_id  |category_id|category_code         |brand   |price|purchased|num_views_product|num_views_session|hour_session_start|min_session_start|day_session_start|date_session_start |duration                           |num_prev_sessions|num_prev_product_views|
+----------+------------------------------------+---------+-----------+----------------------+--------+-----+---------+-----------------+-----------------+------------------+-----------------+-----------------+-------------------+-----------------------------------+-----------------+----------------------+
|1003535   |91769fdf-461b-4e43-9c73-88a07481b75c|244951053|1003535    |elect

[Stage 168:>                                                        (0 + 3) / 3]

1244245 lignes dans final_data


                                                                                

In [10]:
from datetime import datetime

file_name=datetime.now().strftime("%y%m%d%H%M%S")+"_EcomData_"+DATE_START+"_"+DATE_END+".csv"

#Ecriture du fichier csv final
final_data.write.option("header",True) \
                .csv(DESTINATION+"/"+file_name)

print( 'Fichier out généré : '+DESTINATION+"/"+file_name )


                                                                                

Fichier out généré :gs://projets_blent/ecom/out//240111082921_EcomData_2019-10-01_2019-10-03.csv
