### Requirements
-----

In [1]:
import pyspark
pyspark.__version__

'3.5.1'

In [2]:
import pandas
pandas.__version__

'1.5.3'

-----

IMPORTS

```bash
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.1
      /_/
         
Using Scala version 2.12.18 (OpenJDK 64-Bit Server VM, Java 21.0.3)
```

In [3]:
import os
import re
import shutil

from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import TimestampType, StringType
from pyspark.sql import functions

DEFINE & CHECK SPARK ENV

In [18]:
try:
    sc
except NameError:
    print('initializing SparkContext...')
    sq = SparkSession.builder.getOrCreate()
    sc = sq.sparkContext
print(sc, sc.version)

<SparkContext master=local[*] appName=pyspark-shell> 3.5.1


LOAD DATA INTO RDD

In [5]:
dataRDD = sc.textFile("data/data.csv", 1) # 5)
header = dataRDD.first()
schema = header.split("|")

                                                                                

FROM RDD TO DATA FRAME

In [6]:
df = sq.createDataFrame(dataRDD.filter(lambda x:x !=header).map(
    lambda line: line.split("|")), schema)

In [7]:
df.show()

+-------+------+----------+----------+------+------------------+-------------+------------+-------------+-----------------+-----------+
|   hits|visits|       day|identifier|orders|            amount|product_pages|direct_visit|organic_visit|paid_search_visit|email_visit|
+-------+------+----------+----------+------+------------------+-------------+------------+-------------+-----------------+-----------+
|1084135|145634|2020-04-27|        96| 45986| 3061233.890000154|       707126|      400028|       260021|              846|          6|
| 734485|111792|2020-04-30|        96| 53344|3271520.3900004006|       479824|      255051|       159261|              431|          0|
|2084615|182338|2020-04-08|        96| 11576|  908171.750000054|      1319358|      675851|       337172|            37056|         12|
|1133765|157161|2020-04-25|        96| 49829| 3398320.870000415|       720391|      416621|       237090|              801|          7|
|2473217|254864|2020-04-14|        96| 24317|202

USER DEFINED FUNCTION 

In [8]:
# TIME CONVERSION
time = functions.udf(lambda x: datetime.strptime(x.replace('-', ''), '%Y%m%d'), TimestampType())

# ENCODING
encode = functions.udf(lambda x: x.encode("utf-8"), StringType())

DATA FRAME PRE-PROCESSING

In [9]:
df = (df.replace('96', 'BR', "identifier")
        .withColumn('date', time(df.day)) 
        .withColumn('total_visit', df.direct_visit+df.organic_visit+df.paid_search_visit+df.email_visit)
        .withColumn('?', lit(None))
        .drop('day', 'direct_visit', 'organic_visit', 'paid_search_visit', 'email_visit')
        .dropna(subset=['amount'])
        .filter((df.hits > 1000000) & ((df.identifier=='03') | (df.identifier=='96')))
      )

In [10]:
df.show()

[Stage 3:>                                                          (0 + 1) / 1]

+-------+------+----------+------+------------------+-------------+-------------------+-----------+----+
|   hits|visits|identifier|orders|            amount|product_pages|               date|total_visit|   ?|
+-------+------+----------+------+------------------+-------------+-------------------+-----------+----+
|1084135|145634|        BR| 45986| 3061233.890000154|       707126|2020-04-27 00:00:00|   660901.0|NULL|
|2084615|182338|        BR| 11576|  908171.750000054|      1319358|2020-04-08 00:00:00|  1050091.0|NULL|
|1133765|157161|        BR| 49829| 3398320.870000415|       720391|2020-04-25 00:00:00|   654519.0|NULL|
|2473217|254864|        BR| 24317|2029124.6499999256|      1503301|2020-04-14 00:00:00|  1336547.0|NULL|
|1280512|157278|        BR| 40552|2746963.0799991805|       833606|2020-04-23 00:00:00|   774423.0|NULL|
|1409119|138770|        BR|    47|3818.2699999999977|       905227|2020-04-05 00:00:00|   776881.0|NULL|
|2004905|182479|        BR|  9656|  770706.660000026|  

                                                                                

BACK TO RDD

In [11]:
rDD = df.rdd

GROUP BY

In [12]:
groupedData = rDD.groupBy(lambda x: x[2], 
                          numPartitions=1) # 5)

ALGORITHM

In [13]:
def algorithm(rows):
    output = []
    for i, row in enumerate(rows[:-1]):
        output.append((row.identifier, row.date, rows[i+1].date, "%.2f"%float(row.amount)))        
    return output

APPLY ALGORITHM

In [14]:
sessions = groupedData.flatMap(lambda kv: algorithm(kv[1].data))

In [15]:
sessions.take(1)

[('BR',
  datetime.datetime(2020, 4, 27, 0, 0),
  datetime.datetime(2020, 4, 8, 0, 0),
  '3061233.89')]

COMPUTE & SAVE TO FILE

In [16]:
outputDir = "data/sparkOutput"
_ = sessions.saveAsTextFile(outputDir, "org.apache.hadoop.io.compress.GzipCodec")

In [17]:
if os.path.exists(outputDir) and os.path.isdir(outputDir):
    shutil.rmtree(outputDir)

24/07/05 17:36:27 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


THE END!