In [1]:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val transactionsRDD = sc.textFile("Transactions.csv")
val schemaString = "TransID CustID TransTotal TransNumItems TransDesc"

val fields = schemaString.split(" ").
    map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

val rowRDD = transactionsRDD.
    map(_.split(",")).
    map(attributes => Row(attributes(0), attributes(1), attributes(2), attributes(3), attributes(4).trim))
val transactionsDF = spark.createDataFrame(rowRDD, schema)
transactionsDF.createOrReplaceTempView("transactions")


Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.174.130:4041
SparkContext available as 'sc' (version = 2.4.4, master = local[*], app id = local-1572977657549)
SparkSession available as 'spark'


import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
transactionsRDD: org.apache.spark.rdd.RDD[String] = Transactions.csv MapPartitionsRDD[1] at textFile at <console>:28
schemaString: String = TransID CustID TransTotal TransNumItems TransDesc
fields: Array[org.apache.spark.sql.types.StructField] = Array(StructField(TransID,StringType,true), StructField(CustID,StringType,true), StructField(TransTotal,StringType,true), StructField(TransNumItems,StringType,true), StructField(TransDesc,StringType,true))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(TransID,StringType,true), StructField(CustID,StringType,true), StructField(TransTotal,StringType,true), StructField(TransNumItems,StringType,true), StructField(TransDesc,StringType,true))
rowRDD: org.apache...

## T1:	Filter	out	(drop)	the	transactions	from	T whose	total	amount	is	less	than	$200

In [2]:
val T1 = spark.sql("""
    SELECT *
    FROM transactions
    WHERE TransTotal > 200
    """)
T1.createOrReplaceTempView("T1")
T1.show()

+-------+------+----------+-------------+--------------------+
|TransID|CustID|TransTotal|TransNumItems|           TransDesc|
+-------+------+----------+-------------+--------------------+
|      1|  6010| 244.84154|            6|00qmhpq81zp5dqqtl...|
|      2|  6010|  761.2322|            4|57twmelrupehbujpv...|
|      4|  6010| 486.15396|            4|9bmzncxfrp5eck11i...|
|      5|  6010|  519.2052|            2|10edud2zzt1c3rqg9j5c|
|      6|  6010|  866.4158|            3|v0y4avx4jqhklb9t4uu5|
|      7|  6010| 401.36148|            1|g87mafc9jcumz7zi6os3|
|      8|  6010|  811.5077|            8|yn2tztuaffb5e5s8w...|
|      9|  6010| 355.71405|            1|uvbifczc9i8fvo0vx...|
|     10|  6010| 248.01341|            8|o228lmpbzt4f81o8y...|
|     11|  6010|  493.0697|            7|gm354212anu87w6dk13z|
|     12|  6010| 874.57184|            9|a4m0jjh67m9nhvg7q...|
|     13|  6010|  362.9686|            6|ybnyeiopt3wcs2n2j...|
|     14|  6010| 499.51346|            1|mg4sbg5so0qe0r

T1: org.apache.spark.sql.DataFrame = [TransID: string, CustID: string ... 3 more fields]


## T2:	Over	T1,	group	the	transactions	by	the	Number	of	Items	it	has,	and	for	each	group	calculate	the	sum	of	total	amounts,	the	average	of	total	amounts,	the	min	and	the	max	of	the	total	amounts.	

In [3]:
var T2 = spark.sql("""
    SELECT TransNumItems, sum(TransTotal) as sum, avg(TransTotal) as avg, min(TransTotal) as min, max(TransTotal) as max
    FROM T1
    GROUP BY TransNumItems
    """)
T2.createOrReplaceTempView("T2")

T2: org.apache.spark.sql.DataFrame = [TransNumItems: string, sum: double ... 3 more fields]


## 3) Report	back	T2	to	the	client	side

In [4]:
T2.show()

+-------------+--------------------+-----------------+---------+---------+
|TransNumItems|                 sum|              avg|      min|      max|
+-------------+--------------------+-----------------+---------+---------+
|            7| 2.428920693134902E8| 600.956190653305|201.00258| 999.9943|
|            3|2.4219615261177066E8|600.2948297777007|201.00012| 999.9989|
|            8| 2.415458849476496E8|600.5825288489644|201.00223| 999.9958|
|            5| 2.427451312627901E8|600.5822393885693| 201.0001|999.99457|
|            6|2.4243184607598954E8|600.3923981980429|201.00056|999.99786|
|            9|2.4259411774223062E8|600.7853434099744|201.00473|999.99774|
|            1|2.4278203339485908E8|600.4893147901448|201.00418|999.99695|
|           10| 2.423544663999102E8| 600.929995858929|201.00343|  999.999|
|            4|2.4242956306225002E8|600.7884710812874|201.00063| 999.9983|
|            2|2.4217897769453079E8| 600.139212898242| 201.0067|999.99835|
+-------------+----------

## 4) T3:	Over	T1,	group	the	transactions	by	customer	ID,	and	for	each	group	report	the	customer	ID,	and	the	transactions’	count.	

In [5]:
var T3 = spark.sql("""
    SELECT CustID, count(*) as TransCount
    FROM T1
    GROUP BY CustID
    """)
T3.createOrReplaceTempView("T3")
T3.show()

+------+----------+
|CustID|TransCount|
+------+----------+
| 48493|        89|
| 26112|        86|
| 16974|        50|
|  8304|        94|
|  1436|        57|
| 14899|        73|
| 42923|       164|
| 20428|        37|
| 39103|       119|
| 39590|       150|
| 46525|        27|
| 39581|        80|
| 45670|        26|
| 21889|       126|
| 20569|        99|
| 21248|        36|
| 32812|        57|
| 49307|        54|
| 38672|        63|
| 22596|        87|
+------+----------+
only showing top 20 rows



T3: org.apache.spark.sql.DataFrame = [CustID: string, TransCount: bigint]


## 5) T4:	Filter	out	(drop)	the	transactions	from	T whose	total	amount	is	less	than $600

In [6]:
var T4 = spark.sql("""
    SELECT *
    FROM transactions
    WHERE TransTotal < 600
    """)
T4.createOrReplaceTempView("T4")
T4.show()

+-------+------+----------+-------------+--------------------+
|TransID|CustID|TransTotal|TransNumItems|           TransDesc|
+-------+------+----------+-------------+--------------------+
|      1|  6010| 244.84154|            6|00qmhpq81zp5dqqtl...|
|      3|  6010| 114.15211|            9|xn29zbpk98nogxp8e...|
|      4|  6010| 486.15396|            4|9bmzncxfrp5eck11i...|
|      5|  6010|  519.2052|            2|10edud2zzt1c3rqg9j5c|
|      7|  6010| 401.36148|            1|g87mafc9jcumz7zi6os3|
|      9|  6010| 355.71405|            1|uvbifczc9i8fvo0vx...|
|     10|  6010| 248.01341|            8|o228lmpbzt4f81o8y...|
|     11|  6010|  493.0697|            7|gm354212anu87w6dk13z|
|     13|  6010|  362.9686|            6|ybnyeiopt3wcs2n2j...|
|     14|  6010| 499.51346|            1|mg4sbg5so0qe0rt41...|
|     16| 47832|  180.7795|            3|xwbnln1s3gl119h2w...|
|     17| 47832|  594.5123|            2|qsc0inttrs07v3xq2...|
|     18| 47832| 495.70023|            8|bxz580ls2j054z

T4: org.apache.spark.sql.DataFrame = [TransID: string, CustID: string ... 3 more fields]


## 6) T5:	Over	T4,	group	the	transactions	by	customer	ID,	and	for	each	group	report	the	customer	ID,	and	the	transactions’	count.	

In [7]:
var T5 = spark.sql("""
    SELECT CustID, count(*) as TransCount
    FROM T4
    GROUP BY CustID
    """)
T5.createOrReplaceTempView("T5")

T5: org.apache.spark.sql.DataFrame = [CustID: string, TransCount: bigint]


## 7) T6:	Select	the	customer	IDs	whose		T5.count	*	5 <	T3.count

In [8]:
var T6 = spark.sql("""
    SELECT T5.CustID
    FROM T5
    JOIN T3
    ON T5.CustID = T3.CustID
    WHERE (T5.TransCount * 5) < T3.TransCount
    GROUP BY T5.CustID
    """)
transactionsDF.createOrReplaceTempView("T6")
T6.show()

+------+
|CustID|
+------+
| 43723|
| 31088|
+------+



T6: org.apache.spark.sql.DataFrame = [CustID: string]
