In [1]:
#mount google drive to ingest data
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install pyspark==3.0.1 py4j==0.10.9

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark==3.0.1
  Downloading pyspark-3.0.1.tar.gz (204.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m204.2/204.2 MB[0m [31m6.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m27.1 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612247 sha256=80a3e6de63225a52efa04d077aa3b0f5ea9bbfa753febda3c85b6261ed5e3059
  Stored in directory: /root/.cache/pip/wheels/19/b0/c8/6cb894117070e130fc44352c2a13f15b6c27e440d04a84fb48
Successfully built pyspark
Installing collected packages: py4j, py

In [3]:
# Install library for finding Spark
!pip install -q findspark
# Import the libary
import findspark
# Initiate findspark
findspark.init()
# Check the location for Spark
findspark.find()

'/usr/local/lib/python3.10/dist-packages/pyspark'

In [4]:
#import sparksession
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession \
     .builder \
     .appName("Python Spark test") \
     .getOrCreate() 

In [6]:
#test if sparksession is created
spark

In [7]:
#read data from csv into pyspark
df = spark.read.format("csv").load("/content/drive/MyDrive/Salt_transactions.csv",header = True)

In [8]:
df.show()

+-----------+-----------+-----------------------+--------------+------------------------+--------------------------+-----------+--------------------+----------------------+------------------------+
|merchant_id|terminal_id|transaction_create_date|transaction_id|transaction_total_amount|merchant_registration_date|has_loyalty|transaction_currency|transaction_tip_amount|transaction_account_type|
+-----------+-----------+-----------------------+--------------+------------------------+--------------------------+-----------+--------------------+----------------------+------------------------+
|         17|    5589368|   2021-01-20 15:02:...|      25707962|                    9060|      2020-08-13 22:31:...|       null|                 ISK|                     0|                   Debit|
|         17|    5589368|   2020-09-22 15:03:...|       2857362|                    4500|      2020-08-13 22:31:...|       null|                 ISK|                     0|                    null|
|         

In [9]:
#use partiion and order by in pyspark to rank the merchant id and orderby transaction create date
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowpart = Window.partitionBy("merchant_id").orderBy("transaction_create_date")
dfwindow = df.withColumn("Rank_per_Merchantid",row_number().over(windowpart))   


In [10]:
#view result
dfwindow.show()

+-----------+-----------+-----------------------+--------------+------------------------+--------------------------+-----------+--------------------+----------------------+------------------------+-------------------+
|merchant_id|terminal_id|transaction_create_date|transaction_id|transaction_total_amount|merchant_registration_date|has_loyalty|transaction_currency|transaction_tip_amount|transaction_account_type|Rank_per_Merchantid|
+-----------+-----------+-----------------------+--------------+------------------------+--------------------------+-----------+--------------------+----------------------+------------------------+-------------------+
|         11|    5634017|   2020-09-02 14:03:...|        576987|                     100|      2020-08-26 00:18:...|       null|                 ISK|                     0|                    null|                  1|
|         11|    5634017|   2020-09-02 14:04:...|        577137|                45000000|      2020-08-26 00:18:...|       null|

In [11]:
#view schema
dfwindow.printSchema()

root
 |-- merchant_id: string (nullable = true)
 |-- terminal_id: string (nullable = true)
 |-- transaction_create_date: string (nullable = true)
 |-- transaction_id: string (nullable = true)
 |-- transaction_total_amount: string (nullable = true)
 |-- merchant_registration_date: string (nullable = true)
 |-- has_loyalty: string (nullable = true)
 |-- transaction_currency: string (nullable = true)
 |-- transaction_tip_amount: string (nullable = true)
 |-- transaction_account_type: string (nullable = true)
 |-- Rank_per_Merchantid: integer (nullable = true)



In [12]:
from pyspark.sql.functions import col, lit
dforderby_id = dfwindow.orderBy(col("merchant_id").asc(),col("Rank_per_Merchantid").asc())

In [13]:
#check result of using window function
dforderby_id.show(truncate = False)

+-----------+-----------+--------------------------+--------------+------------------------+--------------------------+-----------+--------------------+----------------------+------------------------+-------------------+
|merchant_id|terminal_id|transaction_create_date   |transaction_id|transaction_total_amount|merchant_registration_date|has_loyalty|transaction_currency|transaction_tip_amount|transaction_account_type|Rank_per_Merchantid|
+-----------+-----------+--------------------------+--------------+------------------------+--------------------------+-----------+--------------------+----------------------+------------------------+-------------------+
|10         |5485679    |2020-08-26 13:33:41.000000|162237        |1                       |2020-08-13 22:31:16.000000|null       |ISK                 |0                     |null                    |1                  |
|10         |5485679    |2020-08-26 13:46:16.000000|162762        |300                     |2020-08-13 22:31:16.0000

In [14]:
#checks
dforderby_id.filter("merchant_id == '2'").show()

+-----------+-----------+-----------------------+--------------+------------------------+--------------------------+-----------+--------------------+----------------------+------------------------+-------------------+
|merchant_id|terminal_id|transaction_create_date|transaction_id|transaction_total_amount|merchant_registration_date|has_loyalty|transaction_currency|transaction_tip_amount|transaction_account_type|Rank_per_Merchantid|
+-----------+-----------+-----------------------+--------------+------------------------+--------------------------+-----------+--------------------+----------------------+------------------------+-------------------+
|          2|    5544719|   2020-09-03 14:28:...|        685387|                     449|      2020-08-13 22:31:...|       null|                 ISK|                     0|                    null|                  1|
|          2|    5544719|   2020-09-03 14:30:...|        685662|                     570|      2020-08-13 22:31:...|       null|

In [15]:
#df.where(col("transaction_account_type").isNull()).count()

38230

In [16]:
#save as csv to your drive
dforderby_id.coalesce(1).write.csv("Teya/Teya analysis.csv", mode = 'overwrite', header = True)

In [17]:
# split date to month and yr
from pyspark.sql.functions import date_format
dforderby_id = dforderby_id.withColumn("Year",date_format("transaction_create_date",'yyyy')) \
               .withColumn("Month", date_format("transaction_create_date",'MM'))           

In [18]:
# import sql functions
from pyspark.sql.functions import col, concat_ws, sum,lit

In [19]:
#concat date to month/yr
from pyspark.sql.functions import concat,lit
dforderby_id = dforderby_id.withColumn("MonthYr", concat("Month",lit("/"), "Year"))

In [20]:
#check
dforderby_id.show()

+-----------+-----------+-----------------------+--------------+------------------------+--------------------------+-----------+--------------------+----------------------+------------------------+-------------------+----+-----+-------+
|merchant_id|terminal_id|transaction_create_date|transaction_id|transaction_total_amount|merchant_registration_date|has_loyalty|transaction_currency|transaction_tip_amount|transaction_account_type|Rank_per_Merchantid|Year|Month|MonthYr|
+-----------+-----------+-----------------------+--------------+------------------------+--------------------------+-----------+--------------------+----------------------+------------------------+-------------------+----+-----+-------+
|         10|    5485679|   2020-08-26 13:33:...|        162237|                       1|      2020-08-13 22:31:...|       null|                 ISK|                     0|                    null|                  1|2020|   08|08/2020|
|         10|    5485679|   2020-08-26 13:46:...|   

In [21]:
#aggregate merchant_id 10 using month/yr and summing up the transaction
merchant_id_10 = dforderby_id.filter(dforderby_id.merchant_id == 10)
merchant_id_10 = merchant_id_10.groupby('MonthYr').agg(sum("transaction_total_amount").alias("sum_of_transactions"))

In [22]:
#check
merchant_id_10 = merchant_id_10.orderBy(merchant_id_10.MonthYr.desc())
merchant_id_10.show()

+-------+-------------------+
|MonthYr|sum_of_transactions|
+-------+-------------------+
|10/2020|          1752544.0|
|09/2020|       1.10093542E8|
|08/2020|          1616641.0|
|06/2021|          7248735.0|
|05/2021|          7068529.0|
|04/2021|          3008022.0|
|03/2021|          6756523.0|
|02/2021|        6.0688573E7|
+-------+-------------------+



In [23]:
merchant_id_10.printSchema()

root
 |-- MonthYr: string (nullable = true)
 |-- sum_of_transactions: double (nullable = true)



In [24]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, year, month
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [25]:
data = merchant_id_10

In [26]:
# convert MonthYr to a DateType and extract year and month
data = data.withColumn("Date", to_date(data.MonthYr, "MM/yyyy")) \
           .withColumn("Year", year("Date")) \
           .withColumn("Month", month("Date"))

# aggregate the data by year and month
agg_data = data.groupBy("Year", "Month").sum("sum_of_transactions") \
               .withColumnRenamed("sum(sum_of_transactions)", "sum_of_transactions")

# split the data into training and testing sets
train_data, test_data = agg_data.randomSplit([0.7, 0.3], seed=1234)

# assemble the feature vector
assembler = VectorAssembler(inputCols=["Year", "Month"], outputCol="features")
train_data = assembler.transform(train_data)
test_data = assembler.transform(test_data)

# train the linear regression model
lr = LinearRegression(featuresCol="features", labelCol="sum_of_transactions")
lr_model = lr.fit(train_data)

# evaluate the model on the testing data
predictions = lr_model.transform(test_data)
evaluator = RegressionEvaluator(labelCol="sum_of_transactions", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on testing data:", rmse)

# predict the sum_of_transactions for the next 3 months
future_data = spark.createDataFrame([(2021, 7), (2021, 8), (2021, 9)], ["Year", "Month"])
future_data = assembler.transform(future_data)
predictions = lr_model.transform(future_data)
predictions.show()

Root Mean Squared Error (RMSE) on testing data: 55681167.47490281
+----+-----+------------+--------------------+
|Year|Month|    features|          prediction|
+----+-----+------------+--------------------+
|2021|    7|[2021.0,7.0]|-3.27084811735534...|
|2021|    8|[2021.0,8.0]|-4.874969494128418E7|
|2021|    9|[2021.0,9.0]|-6.479090870901489E7|
+----+-----+------------+--------------------+

