In [66]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import IntegerType

In [67]:
spark = SparkSession.builder.getOrCreate()
#specifying the data path
data_path = '/Users/hamidsakhi/git/BDM-P3/'


purchase_file_path = data_path + 'purchase.csv'
purchase_df = spark.read.format('csv').option('header','false').load(purchase_file_path)
#since the dataset doesn't have column names we should add headers here
purchase_df = purchase_df.withColumnRenamed('_c0','TransId')\
                               .withColumnRenamed('_c1','CustId')\
                               .withColumnRenamed('_c2','TransTotal')\
                               .withColumnRenamed('_c3','TransNumItem')\
                               .withColumnRenamed('_c4','TransDesc')
purchase_df.createOrReplaceTempView('purchase')

customer_file_path = data_path + 'customer.csv'
customer_df = spark.read.format('csv').option('header','false').load(customer_file_path)
#since the dataset doesn't have column names we should add headers here
customer_df = customer_df.withColumnRenamed('_c0','ID')\
                               .withColumnRenamed('_c1','Name')\
                               .withColumnRenamed('_c2','Age')\
                               .withColumnRenamed('_c3','CountryCode')\
                               .withColumnRenamed('_c4','Salary')
#creating the view 
customer_df.createOrReplaceTempView('customer')

In [68]:
T1 = spark.sql("""SELECT * FROM purchase where transtotal>600""")

purchase_df.createOrReplaceTempView('T1')

In [69]:
T2 = spark.sql(""" SELECT AVG(TransTotal) as Mean, MIN(TransTotal) As Min, Max(TransTotal) As Max 
                    FROM T1
                    GROUP BY TransNumitem""")

In [70]:
# reporting back
T2.show()

+------------------+-----+------+
|              Mean|  Min|   Max|
+------------------+-----+------+
|1004.8838597354422| 10.0|999.99|
|1004.5357413229322| 10.0|999.99|
|1004.0483739908203| 10.0|999.99|
|1004.8273086510765| 10.0|999.99|
| 1004.497867026794|10.03|999.99|
|1004.5615556615148|10.01|999.99|
|1005.7739147978656|10.01|999.99|
|1004.8441406029958| 10.0|999.99|
|1004.9988513081908| 10.0|999.97|
|1004.5041289494897|10.01|999.99|
|1005.1305228108343| 10.0|999.99|
|1004.8712438917504| 10.0|999.99|
|1006.6067367065345|10.01|999.99|
|1006.5836723757462| 10.0|999.99|
|1004.6530546314438| 10.0|999.98|
+------------------+-----+------+



In [71]:
#T3: Over T1, group the Purchases from P by customer ID for young customers between 18 and 25 years of age,
# and for each group report the customer ID, their age, and total number of items that this person has 
# purchased and total amount spent by the customer.
T3 = spark.sql(""" SELECT p.custid, sum(p.TransTotal) as total_trans, sum(TransNumItem) as total_item, Max(c.age) as age FROM T1 p, customer c 
                    where p.custid = c.id
                    and c.age between 18 and 25
                    group by p.custid
                    """).createOrReplaceTempView('T3')

In [72]:
Show_T3 = spark.sql("select * from T3").show()

+------+------------------+----------+---+
|custid|       total_trans|total_item|age|
+------+------------------+----------+---+
|  1159|100291.56999999999|     767.0| 18|
| 14887|116478.49000000002|    1007.0| 23|
| 14899|          91923.72|     770.0| 19|
|  1572|         106420.94|     900.0| 18|
| 16576|107532.04000000001|     881.0| 22|
| 17427|           99807.6|     870.0| 20|
| 18726| 81637.35999999999|     693.0| 19|
| 19132|          105694.1|     914.0| 20|
| 20512|          91074.28|     788.0| 21|
| 25969|         123332.88|     896.0| 24|
| 28316|          89832.11|     763.0| 23|
| 29539|         109549.57|     752.0| 18|
| 29573| 86866.79999999999|     651.0| 19|
| 30923| 89413.56999999999|     785.0| 21|
| 31518|103611.19000000002|     820.0| 21|
| 31713|         105562.56|     910.0| 22|
| 33783| 98924.45999999999|     835.0| 20|
| 35844|          89431.42|     774.0| 20|
| 36526|         101558.87|     851.0| 18|
| 40874|          93614.41|     785.0| 25|
+------+---

In [73]:
#T4: Select all customer pairs IDs (C1 and C2) from T3 where C1 is younger in age than 
# customer C2 but C1 spent in total more money than C2 but bought less items.

T4 = spark.sql("""SELECT a.custid as C1, b.custid as C2, a.age as Age1, b.age as Age2,
                round(a.total_trans,2) as Amount1, round(b.total_trans,2) as Amount2, 
                a.total_item as TotalItemCount1, b.total_item as TotalItemCount2 
                FROM T3 a, T3 b
                where a.age<b.age and
                a.total_trans > b.total_trans""").createOrReplaceTempView('T4')

In [74]:
# Report back T4 in the form (C1 ID, C2 ID, Age1, Age2, TotalAmount1, TotalAmount2, TotalItemCount1,TotalItemCount2) to the client side
T5 = spark.sql("select * from T4").show()

+-----+-----+----+----+---------+---------+---------------+---------------+
|   C1|   C2|Age1|Age2|  Amount1|  Amount2|TotalItemCount1|TotalItemCount2|
+-----+-----+----+----+---------+---------+---------------+---------------+
| 1159|14899|  18|  19|100291.57| 91923.72|          767.0|          770.0|
| 1159|17427|  18|  20|100291.57|  99807.6|          767.0|          870.0|
| 1159|18726|  18|  19|100291.57| 81637.36|          767.0|          693.0|
| 1159|20512|  18|  21|100291.57| 91074.28|          767.0|          788.0|
| 1159|28316|  18|  23|100291.57| 89832.11|          767.0|          763.0|
| 1159|29573|  18|  19|100291.57|  86866.8|          767.0|          651.0|
| 1159|30923|  18|  21|100291.57| 89413.57|          767.0|          785.0|
| 1159|33783|  18|  20|100291.57| 98924.46|          767.0|          835.0|
| 1159|35844|  18|  20|100291.57| 89431.42|          767.0|          774.0|
| 1159|40874|  18|  25|100291.57| 93614.41|          767.0|          785.0|
| 1159|41785