#Installations and imports

In [1]:
!apt-get update -qq > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz
!tar xf spark-2.4.8-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.8-bin-hadoop2.7"

import findspark
findspark.init()

#Creating a sparkcontext [Create a spark_conf and pass it to sparkcontext to create sc][This method is followed before spark 2.0 was introdcued. 
#TO access other contects like hive and sql, we must create separate contexts again] 
from pyspark import SparkContext, SparkConf

spark_conf = SparkConf()\
  .setAppName("YourTest")\
  .setMaster("local[*]")

sc = SparkContext.getOrCreate(spark_conf)

# CS 631 Practice

In [3]:
!wget -q https://student.cs.uwaterloo.ca/~cs451/data/ratings_small.csv
!wget -q https://student.cs.uwaterloo.ca/~cs451/data/ratings.csv

In [4]:
!ls -lh

total 904M
-rw-r--r--  1 root root 677M Feb  1  2021 ratings.csv
-rw-r--r--  1 root root 2.4M Feb  1  2021 ratings_small.csv
drwxr-xr-x  1 root root 4.0K Nov 18 14:36 sample_data
drwxr-xr-x 13  501 1000 4.0K May  8  2021 spark-2.4.8-bin-hadoop2.7
-rw-r--r--  1 root root 225M May  8  2021 spark-2.4.8-bin-hadoop2.7.tgz


In [5]:
data =sc.textFile('ratings_small.csv')
data.take(10)

['userId,movieId,rating,timestamp',
 '1,31,2.5,1260759144',
 '1,1029,3.0,1260759179',
 '1,1061,3.0,1260759182',
 '1,1129,2.0,1260759185',
 '1,1172,4.0,1260759205',
 '1,1263,2.0,1260759151',
 '1,1287,2.0,1260759187',
 '1,1293,2.0,1260759148',
 '1,1339,3.5,1260759125']

In [12]:
header=data.first()
data=data.filter(lambda x: x!=header)
data.take(10)

['1,31,2.5,1260759144',
 '1,1029,3.0,1260759179',
 '1,1061,3.0,1260759182',
 '1,1129,2.0,1260759185',
 '1,1172,4.0,1260759205',
 '1,1263,2.0,1260759151',
 '1,1287,2.0,1260759187',
 '1,1293,2.0,1260759148',
 '1,1339,3.5,1260759125',
 '1,1343,2.0,1260759131']

In [16]:
rdd1=data.map(lambda x:x.split(",")).map(lambda x: (x[1],float(x[2]))).groupByKey()

In [20]:
rdd1.map(lambda x: (x[0],sum(x[1])/len(x[1]))).take(10)

[('1129', 3.3125),
 ('1293', 3.9782608695652173),
 ('1339', 3.298076923076923),
 ('1405', 3.032608695652174),
 ('2105', 3.478723404255319),
 ('2150', 3.513888888888889),
 ('2455', 3.393617021276596),
 ('10', 3.4508196721311477),
 ('17', 3.9244186046511627),
 ('50', 4.370646766169155)]

groupByKey() works well when the data is less.
Use reduceByKey() when there is large data.

In [21]:
data =sc.textFile('ratings.csv')
header=data.first()
data=data.filter(lambda x: x!=header)
data.take(10)

['1,110,1.0,1425941529',
 '1,147,4.5,1425942435',
 '1,858,5.0,1425941523',
 '1,1221,5.0,1425941546',
 '1,1246,5.0,1425941556',
 '1,1968,4.0,1425942148',
 '1,2762,4.5,1425941300',
 '1,2918,5.0,1425941593',
 '1,2959,4.0,1425941601',
 '1,4226,4.0,1425942228']

In [24]:
rdd1=data.map(lambda x:x.split(",")).map(lambda x: (x[1],(float(x[2]),1))).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))

In [25]:
rdd1.take(10)

[('110', (267116.0, 66512)),
 ('147', (17861.0, 4967)),
 ('2959', (253944.5, 60024)),
 ('81834', (44783.0, 11562)),
 ('628', (47666.0, 12702)),
 ('1097', (146270.5, 39023)),
 ('157', (7657.0, 2743)),
 ('1198', (246807.5, 59693)),
 ('1201', (76664.0, 18609)),
 ('3535', (42447.0, 11875))]

In [26]:
final_result=rdd1.map(lambda x: (x[0],x[1][0]/x[1][1]))

In [27]:
final_result.take(10)

[('110', 4.016057252826558),
 ('147', 3.5959331588483994),
 ('2959', 4.2307160469145675),
 ('81834', 3.8732918180245632),
 ('628', 3.752637379940167),
 ('1097', 3.748315096225303),
 ('157', 2.7914691943127963),
 ('1198', 4.134613773809324),
 ('1201', 4.119727013810522),
 ('3535', 3.574484210526316)]

Let us save the contents of this rdd into a txt file

The following code will create a folder  named 'rating' and will have many text files. Number of files equals number of partitions spark has made to the main file.

In [28]:
final_result.saveAsTextFile('rating')

To produce contents in only one file

In [30]:
final_result.coalesce(1).saveAsTextFile('rating_1')

In [5]:
sc.getConf().getAll()

[('spark.app.name', 'YourTest'),
 ('spark.rdd.compress', 'True'),
 ('spark.app.id', 'local-1637887965365'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.host', '15b8af5da677'),
 ('spark.master', 'local[*]'),
 ('spark.driver.port', '46267'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true')]

# Udacity Spark Practice

In Spark 1.x, three entry points were introduced: **SparkContext, SQLContext and HiveContext**. Since Spark 2.x, a new entry point called **SparkSession**. ONly one is sufficient!

If you are using the spark-shell, SparkContext is already available through the variable called sc.

Reference: https://towardsdatascience.com/sparksession-vs-sparkcontext-vs-sqlcontext-vs-hivecontext-741d50c9486a

In [3]:
from pyspark.sql import SparkSession

In [16]:
spark= SparkSession.builder.appName('First spark sql example').getOrCreate()

In [17]:
spark.sparkContext.getConf().getAll()

[('spark.rdd.compress', 'True'),
 ('spark.app.id', 'local-1637887965365'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.host', '15b8af5da677'),
 ('spark.master', 'local[*]'),
 ('spark.driver.port', '46267'),
 ('spark.executor.id', 'driver'),
 ('spark.app.name', 'First spark sql example'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true')]

In [8]:
sc.getConf().getAll()

[('spark.rdd.compress', 'True'),
 ('spark.app.id', 'local-1637887965365'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.host', '15b8af5da677'),
 ('spark.master', 'local[*]'),
 ('spark.driver.port', '46267'),
 ('spark.executor.id', 'driver'),
 ('spark.app.name', 'First spark sql example'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true')]

In [18]:
spark