# Data Modelling

In [1]:
from pyspark.sql import SparkSession, functions as F

import findspark
import pandas as pd

findspark.init("C:\Program Files\Spark\spark-3.3.1-bin-hadoop3")

pd.set_option("display.max_columns", None)
pd.set_option("display.max_colwidth", None)

Imported necessary libraries.

## Build Spark Session

In [2]:
spark = (SparkSession.builder
            .appName("Spark-Cassandra-With-Catalog")
            .master("local[2]")
            .config("spark.driver.memory","2048m")
            .config("spark.sql.shuffle.partitions", 4)
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            .config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.3.0") 
            .config("spark.sql.extensions","com.datastax.spark.connector.CassandraSparkExtensions")
            .config("spark.sql.catalog.docker3nodescluster", 
                    "com.datastax.spark.connector.datasource.CassandraCatalog")
            .config("spark.sql.catalog.docker3nodescluster.spark.cassandra.connection.host", 
                    "127.0.0.1")
            .getOrCreate())

## Read CSV File

In [3]:
csv_path = "file:///Users/talha/OneDrive/Masaüstü/Talha Nebi Kumru/Data Enginnering/Miuul/NOSQL/Data_Modelling_Apache_Cassandra/datasets/iyzico_data-2.csv"
df = (spark.read.format("csv")
          .option("header", True)
          .option("sep", ",")
          .option("inferSchema", True)
          .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
          .load(csv_path))

In [4]:
df.dtypes

[('iyzico_data.csv\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x000000664\x000001750\x000001750\x0004613461137\x0014176564341\x00013155\x00 0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00ustar  \x00train\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00train\x00\x00\x00\x00\x00\x00\x00\x00\x00\x

In [5]:
df = df.withColumnRenamed(df.dtypes[0][0], "transaction_date")

In [6]:
df.show(5)

+--------------------+-----------+--------------------+----------------+
|    transaction_date|merchant_id|            category|total_paid_price|
+--------------------+-----------+--------------------+----------------+
|2018-01-01 00:00:...|     124381|Gündelik Eşya Mağ...|         410.112|
|2018-01-01 00:01:...|     124381|Gündelik Eşya Mağ...|          485.91|
|2018-01-01 00:02:...|     124381|Gündelik Eşya Mağ...|           66.42|
|2018-01-01 00:04:...|     124381|Gündelik Eşya Mağ...|          225.09|
|2018-01-01 00:04:...|      46774|Emlak Ofisleri ve...|          479.34|
+--------------------+-----------+--------------------+----------------+
only showing top 5 rows



In [7]:
df.count()

8391254

In [8]:
spark.version

'3.3.1'

In [9]:
df.dtypes

[('transaction_date', 'string'),
 ('merchant_id', 'int'),
 ('category', 'string'),
 ('total_paid_price', 'double')]

In [49]:
df = df.withColumn("transaction_date", F.to_timestamp("transaction_date"))

## Creating Cassandra Keyspace

In [10]:
spark.conf.set("spark.cassandra.output.consistency.level", "LOCAL_ONE")

In [11]:
spark.sql(""" 
    CREATE DATABASE IF NOT EXISTS docker3nodescluster.iyzico \
    WITH DBPROPERTIES (class='SimpleStrategy', replication_factor='3')
""").show()

++
||
++
++



!docker exec -it cas1 bash<br>
!cqlsh<br>
- iyzico              
- mykeyspace  
- retail

## Control Duplicates Data

In [12]:
duplicates = df.groupBy('transaction_date', 'merchant_id', 'total_paid_price').count().alias('count') \
    .filter('count > 1')

In [13]:
duplicates.show()

+--------------------+-----------+----------------+-----+
|    transaction_date|merchant_id|total_paid_price|count|
+--------------------+-----------+----------------+-----+
|2018-01-02 17:57:...|        535|            36.0|    2|
|2018-01-03 09:42:...|      86302|           0.504|    2|
|2018-01-03 22:00:...|      42616|          143.91|    2|
|2018-01-03 22:09:...|      42616|           71.91|    2|
|2018-01-03 22:20:...|      42616|          179.82|    2|
|2018-01-03 22:23:...|      42616|           71.91|    2|
|2018-01-03 22:27:...|      42616|           71.91|    2|
|2018-01-03 22:33:...|      42616|           71.91|    2|
|2018-01-03 22:35:...|      42616|           46.71|    2|
|2018-01-03 22:37:...|      42616|           53.91|    2|
|2018-01-03 22:50:...|      42616|           71.91|    2|
|2018-01-03 22:51:...|      42616|           71.91|    3|
|2018-01-03 22:55:...|      42616|          107.91|    3|
|2018-01-03 22:56:...|      42616|           71.91|    2|
|2018-01-03 23

In [14]:
duplicates.count()

15766

In [15]:
df = df.distinct()

In [16]:
df.count()

8374318

In [17]:
duplicates = df.groupBy('transaction_date', 'merchant_id', 'total_paid_price').count().alias('count') \
    .filter('count > 1')

In [18]:
duplicates.show()

+----------------+-----------+----------------+-----+
|transaction_date|merchant_id|total_paid_price|count|
+----------------+-----------+----------------+-----+
+----------------+-----------+----------------+-----+



## EDA Operations with Spark SQL

In [19]:
df.createOrReplaceTempView("REVIEW")

Creating view table.

### NUNIQUE MERCHANT

In [20]:
spark.sql("SELECT COUNT(DISTINCT(merchant_id)) AS COUNT_UNIQUE_MERCHANT  FROM REVIEW").show()

+---------------------+
|COUNT_UNIQUE_MERCHANT|
+---------------------+
|                    7|
+---------------------+



### COUNT MERCHANT TRANSACTION

In [21]:
spark.sql("SELECT merchant_id, COUNT(merchant_id) AS COUNT FROM REVIEW \
        GROUP BY merchant_id").show()

+-----------+-------+
|merchant_id|  COUNT|
+-----------+-------+
|        535|1298529|
|      42616|1119318|
|     124381|1933755|
|      46774|1598586|
|     129316| 439667|
|      86302| 838645|
|      57192|1145818|
+-----------+-------+



### NUNIQUE CATEGORY

In [22]:
spark.sql("SELECT COUNT(DISTINCT(category)) AS COUNT_UNIQUE_CATEGORY FROM REVIEW").show()

+---------------------+
|COUNT_UNIQUE_CATEGORY|
+---------------------+
|                    7|
+---------------------+



### COUNT CATEGORY TRANSACTION

In [23]:
spark.sql("""
    SELECT category, COUNT(category) AS COUNT FROM REVIEW \
    GROUP BY category
""").show()

+--------------------+-------+
|            category|  COUNT|
+--------------------+-------+
|Endüstriyel Malze...| 838645|
|Profesyonel Hizme...|1298529|
|Emlak Ofisleri ve...|1598586|
|Çeşitli Gıda Mağa...| 439667|
|Bilgisayar Yazılı...|1145818|
|Gündelik Eşya Mağ...|1933755|
|Kadın,Erkek Giyim...|1119318|
+--------------------+-------+



### COUNT DAY OF TRANSACTION_DATE TRANSACTION

In [33]:
spark.sql("""
    SELECT DATE(transaction_date), COUNT(*) AS COUNT FROM REVIEW \
    GROUP BY DATE(transaction_date)
""").show(10)

+----------------+-----+
|transaction_date|COUNT|
+----------------+-----+
|      2018-01-02| 3531|
|      2018-01-03| 8077|
|      2018-01-20| 3611|
|      2018-01-22| 4184|
|      2018-01-25| 3662|
|      2018-01-28| 2427|
|      2018-01-31| 4475|
|      2018-02-01| 4487|
|      2018-02-02| 4395|
|      2018-02-04| 2831|
+----------------+-----+
only showing top 10 rows



### NUNIQUE DAY OF TRANSACTION_DATE

In [34]:
spark.sql("""
    SELECT COUNT(DISTINCT(DATE(transaction_date))) AS NUNIQUE_DAY FROM REVIEW
""").show()

+-----------+
|NUNIQUE_DAY|
+-----------+
|       1097|
+-----------+



### ADD DAY OF TRANSACTION_DATE

In [35]:
df = df.withColumn("trans_day", F.expr("DATE(transaction_date)"))

In [36]:
df.groupBy("merchant_id", "trans_day").agg(F.countDistinct("merchant_id", "trans_day").alias("nunique")).show()

+-----------+----------+-------+
|merchant_id| trans_day|nunique|
+-----------+----------+-------+
|        535|2018-01-01|      1|
|     124381|2018-01-03|      1|
|     129316|2018-01-04|      1|
|      46774|2018-01-05|      1|
|      86302|2018-01-14|      1|
|        535|2018-01-18|      1|
|      86302|2018-01-18|      1|
|        535|2018-01-19|      1|
|      86302|2018-01-21|      1|
|      57192|2018-01-24|      1|
|     124381|2018-01-29|      1|
|     124381|2018-01-30|      1|
|     129316|2018-01-31|      1|
|      86302|2018-02-02|      1|
|      46774|2018-02-04|      1|
|      57192|2018-02-05|      1|
|      57192|2018-02-08|      1|
|     124381|2018-02-09|      1|
|     124381|2018-02-12|      1|
|     124381|2018-02-14|      1|
+-----------+----------+-------+
only showing top 20 rows



## Create Cassandra Table with Spark

In [37]:
spark.sql("""
    DESCRIBE DATABASE docker3nodescluster.iyzico;
""").show()

+--------------+----------+
|     info_name|info_value|
+--------------+----------+
|Namespace Name|    iyzico|
+--------------+----------+



In [71]:
df.dtypes

[('trans_day', 'date'),
 ('transaction_date', 'timestamp'),
 ('category', 'string'),
 ('merchant_id', 'int'),
 ('total_paid_price', 'double')]

In [39]:
spark.sql("""
    CREATE TABLE IF NOT EXISTS docker3nodescluster.iyzico.model
    (transaction_date TIMESTAMP,
    merchant_id INT,
    category STRING,
    total_paid_price DOUBLE,
    trans_day DATE)
        USING cassandra
        PARTITIONED BY (trans_day)
        TBLPROPERTIES (clustering_key='transaction_date.desc',
                    compaction='{class=SizeTieredCompactionStrategy,bucket_high=1001}')
""")

DataFrame[]

In [40]:
cassandra = spark.read.table("docker3nodescluster.iyzico.model")

In [47]:
cassandra.show()

+---------+----------------+--------+-----------+----------------+
|trans_day|transaction_date|category|merchant_id|total_paid_price|
+---------+----------------+--------+-----------+----------------+
+---------+----------------+--------+-----------+----------------+



In [48]:
cassandra.dtypes

[('trans_day', 'date'),
 ('transaction_date', 'timestamp'),
 ('category', 'string'),
 ('merchant_id', 'int'),
 ('total_paid_price', 'double')]

In [42]:
df = df.select(*cassandra.columns)

In [43]:
df.show(5)

+----------+--------------------+--------------------+-----------+----------------+
| trans_day|    transaction_date|            category|merchant_id|total_paid_price|
+----------+--------------------+--------------------+-----------+----------------+
|2018-01-01|2018-01-01 00:01:...|Gündelik Eşya Mağ...|     124381|          485.91|
|2018-01-01|2018-01-01 00:06:...|Profesyonel Hizme...|        535|            18.0|
|2018-01-01|2018-01-01 00:19:...|Emlak Ofisleri ve...|      46774|         5191.83|
|2018-01-01|2018-01-01 00:19:...|Emlak Ofisleri ve...|      46774|          862.92|
|2018-01-01|2018-01-01 00:28:...|Gündelik Eşya Mağ...|     124381|         780.318|
+----------+--------------------+--------------------+-----------+----------------+
only showing top 5 rows



In [51]:
%%time
df.write.mode("append") \
    .format("org.apache.spark.sql.cassandra") \
    .saveAsTable("docker3nodescluster.iyzico.model")

Wall time: 4min 32s


## Read Cassandra Table with Spark

In [52]:
cassandra = spark.read.table("docker3nodescluster.iyzico.model")

In [53]:
cassandra.show(5)

+----------+-------------------+--------------------+-----------+----------------+
| trans_day|   transaction_date|            category|merchant_id|total_paid_price|
+----------+-------------------+--------------------+-----------+----------------+
|2018-02-08|2018-02-08 23:57:21|Kadın,Erkek Giyim...|      42616|          323.91|
|2018-02-08|2018-02-08 23:56:57|Bilgisayar Yazılı...|      57192|          64.188|
|2018-02-08|2018-02-08 23:56:39|Gündelik Eşya Mağ...|     124381|         303.444|
|2018-02-08|2018-02-08 23:55:50|Kadın,Erkek Giyim...|      42616|          143.82|
|2018-02-08|2018-02-08 23:54:47|Bilgisayar Yazılı...|      57192|          47.628|
+----------+-------------------+--------------------+-----------+----------------+
only showing top 5 rows



In [69]:
%%time
cassandra.filter("trans_day='2018-01-17'").count()

Wall time: 9.95 s


3861

In [70]:
%%time
cassandra.filter("category = 'Bilgisayar Yazılım Mağazaları'").count()

Wall time: 30.4 s


1060004

In [72]:
spark.stop()