In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("RDDExample").getOrCreate()

In [4]:
df_sales=spark.read.option("header",True).option("inferschema",True).csv("sales_info.csv")
df_product=spark.read.option("header",True).option("inferschema",True).csv("product_info.csv")

In [6]:
df_sales.show()

+--------------+----------+-----------+-------------+-------------------+
|transaction_id|product_id|customer_id|quantity_sold|          timestamp|
+--------------+----------+-----------+-------------+-------------------+
|             1|       101|        201|            5|2023-09-22 10:15:00|
|             2|       102|        202|            3|2023-09-22 11:30:00|
|             3|       101|        203|            2|2023-09-22 12:45:00|
|             4|       103|        204|            1|2023-09-22 14:00:00|
|             5|       102|        205|            4|2023-09-22 15:15:00|
+--------------+----------+-----------+-------------+-------------------+



In [8]:
df_product.show()

+----------+------------+-----------+-----+
|product_id|product_name|   category|price|
+----------+------------+-----------+-----+
|       101|      Laptop|Electronics|  800|
|       102|  Smartphone|Electronics|  600|
|       103|        Desk|  Furniture|  250|
|       104|  Headphones|Electronics|  100|
|       105|       Chair|  Furniture|  150|
+----------+------------+-----------+-----+



In [10]:
df_sales.join(df_product, on='product_id').show() #mentioning product_id is not necessary as PySpark is smart enough to identify on its own

+----------+--------------+-----------+-------------+-------------------+------------+-----------+-----+
|product_id|transaction_id|customer_id|quantity_sold|          timestamp|product_name|   category|price|
+----------+--------------+-----------+-------------+-------------------+------------+-----------+-----+
|       101|             1|        201|            5|2023-09-22 10:15:00|      Laptop|Electronics|  800|
|       102|             2|        202|            3|2023-09-22 11:30:00|  Smartphone|Electronics|  600|
|       101|             3|        203|            2|2023-09-22 12:45:00|      Laptop|Electronics|  800|
|       103|             4|        204|            1|2023-09-22 14:00:00|        Desk|  Furniture|  250|
|       102|             5|        205|            4|2023-09-22 15:15:00|  Smartphone|Electronics|  600|
+----------+--------------+-----------+-------------+-------------------+------------+-----------+-----+



In [12]:
df_sales.join(df_product, on=df_sales['product_id']==df_product['product_id']).show()

+--------------+----------+-----------+-------------+-------------------+----------+------------+-----------+-----+
|transaction_id|product_id|customer_id|quantity_sold|          timestamp|product_id|product_name|   category|price|
+--------------+----------+-----------+-------------+-------------------+----------+------------+-----------+-----+
|             1|       101|        201|            5|2023-09-22 10:15:00|       101|      Laptop|Electronics|  800|
|             2|       102|        202|            3|2023-09-22 11:30:00|       102|  Smartphone|Electronics|  600|
|             3|       101|        203|            2|2023-09-22 12:45:00|       101|      Laptop|Electronics|  800|
|             4|       103|        204|            1|2023-09-22 14:00:00|       103|        Desk|  Furniture|  250|
|             5|       102|        205|            4|2023-09-22 15:15:00|       102|  Smartphone|Electronics|  600|
+--------------+----------+-----------+-------------+-------------------

In [18]:
df_sales_product=df_sales.join(df_product, on=df_sales['product_id']==df_product['product_id'],how="left")

In [17]:
df_sales.join(df_product).select("transaction_id","quantity_sold","product_name","price").show()

+--------------+-------------+------------+-----+
|transaction_id|quantity_sold|product_name|price|
+--------------+-------------+------------+-----+
|             1|            5|      Laptop|  800|
|             1|            5|  Smartphone|  600|
|             1|            5|        Desk|  250|
|             1|            5|  Headphones|  100|
|             1|            5|       Chair|  150|
|             2|            3|      Laptop|  800|
|             2|            3|  Smartphone|  600|
|             2|            3|        Desk|  250|
|             2|            3|  Headphones|  100|
|             2|            3|       Chair|  150|
|             3|            2|      Laptop|  800|
|             3|            2|  Smartphone|  600|
|             3|            2|        Desk|  250|
|             3|            2|  Headphones|  100|
|             3|            2|       Chair|  150|
|             4|            1|      Laptop|  800|
|             4|            1|  Smartphone|  600|


In [21]:
df_sales_product.filter("transaction_id=1").show() #can use where insteadof filter, same result 
#df_sales_product.where("transaction_id=1").show()
#df_sales_product.where(col("transaction_id")==1).show()

+--------------+----------+-----------+-------------+-------------------+----------+------------+-----------+-----+
|transaction_id|product_id|customer_id|quantity_sold|          timestamp|product_id|product_name|   category|price|
+--------------+----------+-----------+-------------+-------------------+----------+------------+-----------+-----+
|             1|       101|        201|            5|2023-09-22 10:15:00|       101|      Laptop|Electronics|  800|
+--------------+----------+-----------+-------------+-------------------+----------+------------+-----------+-----+



In [23]:
employees = [(1, "Scott", "Tiger", 1000.0, 
                      "united states", "+1 123 456 7890", "123 45 6789"
                     ),
                     (2, "Henry", "Ford", 1250.0, 
                      "India", "+91 234 567 8901", "456 78 9123"
                     ),
                     (3, "Nick", "Junior", 750.0, 
                      "united KINGDOM", "+44 111 111 1111", "222 33 4444"
                     ),
                     (4, "Bill", "Gomes", 1500.0, 
                      "AUSTRALIA", "+61 987 654 3210", "789 12 6118"
                     )
                ]

In [45]:
employeesDF = spark. \
    createDataFrame(employees,
                    schema="""employee_id INT, first_name STRING, 
                    last_name STRING, salary FLOAT, nationality STRING,
                    phone_number STRING, ssn STRING"""
                   )

In [46]:
employeesDF.show()

+-----------+----------+---------+------+--------------+----------------+-----------+
|employee_id|first_name|last_name|salary|   nationality|    phone_number|        ssn|
+-----------+----------+---------+------+--------------+----------------+-----------+
|          1|     Scott|    Tiger|1000.0| united states| +1 123 456 7890|123 45 6789|
|          2|     Henry|     Ford|1250.0|         India|+91 234 567 8901|456 78 9123|
|          3|      Nick|   Junior| 750.0|united KINGDOM|+44 111 111 1111|222 33 4444|
|          4|      Bill|    Gomes|1500.0|     AUSTRALIA|+61 987 654 3210|789 12 6118|
+-----------+----------+---------+------+--------------+----------------+-----------+



In [48]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
employeesDF.withColumn("nationality",upper(col("nationality"))).show()

+-----------+----------+---------+------+--------------+----------------+-----------+
|employee_id|first_name|last_name|salary|   nationality|    phone_number|        ssn|
+-----------+----------+---------+------+--------------+----------------+-----------+
|          1|     Scott|    Tiger|1000.0| UNITED STATES| +1 123 456 7890|123 45 6789|
|          2|     Henry|     Ford|1250.0|         INDIA|+91 234 567 8901|456 78 9123|
|          3|      Nick|   Junior| 750.0|UNITED KINGDOM|+44 111 111 1111|222 33 4444|
|          4|      Bill|    Gomes|1500.0|     AUSTRALIA|+61 987 654 3210|789 12 6118|
+-----------+----------+---------+------+--------------+----------------+-----------+



In [49]:
employeesDF.withColumn("ssn_last_4_digits",expr('substring(ssn,-4)')).show()

+-----------+----------+---------+------+--------------+----------------+-----------+-----------------+
|employee_id|first_name|last_name|salary|   nationality|    phone_number|        ssn|ssn_last_4_digits|
+-----------+----------+---------+------+--------------+----------------+-----------+-----------------+
|          1|     Scott|    Tiger|1000.0| united states| +1 123 456 7890|123 45 6789|             6789|
|          2|     Henry|     Ford|1250.0|         India|+91 234 567 8901|456 78 9123|             9123|
|          3|      Nick|   Junior| 750.0|united KINGDOM|+44 111 111 1111|222 33 4444|             4444|
|          4|      Bill|    Gomes|1500.0|     AUSTRALIA|+61 987 654 3210|789 12 6118|             6118|
+-----------+----------+---------+------+--------------+----------------+-----------+-----------------+



In [53]:
employeesDF.withColumn("nationality", upper(col("nationality"))).\
withColumn("last_4_ssn",expr('substring(ssn,-4)')).\
withColumn("Country Code", split(col("phone_number"), " ").getItem(0)).\
withColumn("Area Code", split(col("phone_number"), " ").getItem(1)).show()

+-----------+----------+---------+------+--------------+----------------+-----------+----------+------------+---------+
|employee_id|first_name|last_name|salary|   nationality|    phone_number|        ssn|last_4_ssn|Country Code|Area Code|
+-----------+----------+---------+------+--------------+----------------+-----------+----------+------------+---------+
|          1|     Scott|    Tiger|1000.0| UNITED STATES| +1 123 456 7890|123 45 6789|      6789|          +1|      123|
|          2|     Henry|     Ford|1250.0|         INDIA|+91 234 567 8901|456 78 9123|      9123|         +91|      234|
|          3|      Nick|   Junior| 750.0|UNITED KINGDOM|+44 111 111 1111|222 33 4444|      4444|         +44|      111|
|          4|      Bill|    Gomes|1500.0|     AUSTRALIA|+61 987 654 3210|789 12 6118|      6118|         +61|      987|
+-----------+----------+---------+------+--------------+----------------+-----------+----------+------------+---------+



In [55]:
employeesDF. \
    withColumn("nationality",upper("nationality")).\
    withColumn("ssn_last4", substring(col("ssn"), -4, 4).cast("int")).\
    withColumn("country_code", split("phone_number", " ")[0].cast("int")).\
    withColumn("area_code", split("phone_number", " ")[1].cast("int")).\
    show()

+-----------+----------+---------+------+--------------+----------------+-----------+---------+------------+---------+
|employee_id|first_name|last_name|salary|   nationality|    phone_number|        ssn|ssn_last4|country_code|area_code|
+-----------+----------+---------+------+--------------+----------------+-----------+---------+------------+---------+
|          1|     Scott|    Tiger|1000.0| UNITED STATES| +1 123 456 7890|123 45 6789|     6789|           1|      123|
|          2|     Henry|     Ford|1250.0|         INDIA|+91 234 567 8901|456 78 9123|     9123|          91|      234|
|          3|      Nick|   Junior| 750.0|UNITED KINGDOM|+44 111 111 1111|222 33 4444|     4444|          44|      111|
|          4|      Bill|    Gomes|1500.0|     AUSTRALIA|+61 987 654 3210|789 12 6118|     6118|          61|      987|
+-----------+----------+---------+------+--------------+----------------+-----------+---------+------------+---------+



In [57]:
spark

In [60]:
employeesDF.groupBy("nationality").count().show()

+--------------+-----+
|   nationality|count|
+--------------+-----+
|         India|    1|
| united states|    1|
|united KINGDOM|    1|
|     AUSTRALIA|    1|
+--------------+-----+

