##Install packages

In [4]:
!apt-get install openjdk-8-jdk-headless -qq
!pip install findspark
!pip install pyspark
!wget -v https://apache.osuosl.org/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz 
!tar xf spark-3.3.0-bin-hadoop3.tgz
!ls sp*


Selecting previously unselected package openjdk-8-jre-headless:amd64.
(Reading database ... 155569 files and directories currently installed.)
Preparing to unpack .../openjdk-8-jre-headless_8u342-b07-0ubuntu1~18.04_amd64.deb ...
Unpacking openjdk-8-jre-headless:amd64 (8u342-b07-0ubuntu1~18.04) ...
Selecting previously unselected package openjdk-8-jdk-headless:amd64.
Preparing to unpack .../openjdk-8-jdk-headless_8u342-b07-0ubuntu1~18.04_amd64.deb ...
Unpacking openjdk-8-jdk-headless:amd64 (8u342-b07-0ubuntu1~18.04) ...
Setting up openjdk-8-jre-headless:amd64 (8u342-b07-0ubuntu1~18.04) ...
update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/orbd to provide /usr/bin/orbd (orbd) in auto mode
update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/servertool to provide /usr/bin/servertool (servertool) in auto mode
update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/tnameserv to provide /usr/bin/tnameserv (tnameserv) in auto mode
Setting up ope

##Set up Python environment

In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.0-bin-hadoop3"
import findspark
findspark.init()
import pyspark
print(pyspark.__version__)
!java --version

3.3.0
openjdk 11.0.16 2022-07-19
OpenJDK Runtime Environment (build 11.0.16+8-post-Ubuntu-0ubuntu118.04)
OpenJDK 64-Bit Server VM (build 11.0.16+8-post-Ubuntu-0ubuntu118.04, mixed mode, sharing)


In [6]:
from google.colab import drive
drive.mount('/gdrive')
%cd /gdrive


Mounted at /gdrive
/gdrive


In [7]:
import pandas as pd
import numpy  as np
infolder = "MyDrive/MLData/spark/ml-100k"

In [18]:
from pyspark import SparkConf, SparkContext
import collections

conf = SparkConf().setMaster("local").setAppName("SparkTest")
sc = SparkContext(conf = conf)

## Working with RDD

###Calculate Average Friends by age

In [19]:
## Friends By Age
##------------------------------------------------------------##
#conf2 = SparkConf().setMaster("local").setAppName("FriendsByAge")
#sc = SparkContext(conf = conf2)

def parseLine(line):
    fields = line.split(',')
    age = int(fields[2])
    numFriends = int(fields[3])
    return (age, numFriends)

lines = sc.textFile(infolder+"/../fakefriends.csv")
rdd = lines.map(parseLine)
totalsByAge = rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
averagesByAge = totalsByAge.mapValues(lambda x: x[0] / x[1])
results = sorted(averagesByAge.collect())
for result in results:
    if result[0]<30:
      print("{:6.0f} {:6.2f}".format(result[0],result[1]))

    18 343.38
    19 213.27
    20 165.00
    21 350.88
    22 206.43
    23 246.30
    24 233.80
    25 197.45
    26 242.06
    27 228.12
    28 209.10
    29 215.92


###Total/Sum expenditure by customer 

In [20]:
def extractCustomerPricePairs(line):
    fields = line.split(',')
    return (int(fields[0]), float(fields[2]))

def res_key (res):
  return res[1]

input = sc.textFile(infolder+"/../customer-orders.csv")
mappedInput = input.map(extractCustomerPricePairs)
totalByCustomer = mappedInput.reduceByKey(lambda x, y: x + y)

results = totalByCustomer.collect();

def res_key (res):
  return res[1]
results = sorted(results,key=res_key,reverse=True)
for result in results:
    if (result[1]>5800 or result[1]<3500):
      print ("{:6.0f} {:6.2f}".format(result[0],result[1]))

    68 6375.45
    73 6206.20
    39 6193.11
    54 6065.39
    71 5995.66
     2 5994.59
    97 5977.19
    46 5963.11
    45 3309.38


###Movie Database


In [11]:
lines   = sc.textFile(infolder+"/u.data")
ratings = lines.map(lambda x: x.split()[2])
result  = ratings.countByValue()

sortedResults = collections.OrderedDict(sorted(result.items()))
for key, value in sortedResults.items():
    print("%s %i" % (key, value))


1 6110
2 11370
3 27145
4 34174
5 21201


In [12]:
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, LongType
import codecs

def loadMovieNames():
    movieNames = {}
    with codecs.open(infolder+"/u.item", "r", encoding='ISO-8859-1', errors='ignore') as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

###Calculate highest rating movie

In [13]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()
nameDict = spark.sparkContext.broadcast(loadMovieNames())

# Create schema when reading u.data
schema = StructType([ \
                     StructField("userID", IntegerType(), True), \
                     StructField("movieID", IntegerType(), True), \
                     StructField("rating", IntegerType(), True), \
                     StructField("timestamp", LongType(), True)])
# Load up movie data as dataframe
moviesDF    = spark.read.option("sep", "\t").schema(schema).csv(infolder+"/u.data")
movieCounts = moviesDF.groupBy("movieID").count()
# Create a user-defined function to look up movie names from our broadcasted dictionary
def lookupName(movieID):
    return nameDict.value[movieID]
lookupNameUDF = func.udf(lookupName)

# Add a movieTitle column using our new udf
moviesWithNames = movieCounts.withColumn("movieTitle", lookupNameUDF(func.col("movieID")))
# Sort the results
sortedMoviesWithNames = moviesWithNames.orderBy(func.desc("count"))
# Grab the top 10
sortedMoviesWithNames.show(10, False)
# Stop the session
spark.stop()

+-------+-----+-----------------------------+
|movieID|count|movieTitle                   |
+-------+-----+-----------------------------+
|50     |583  |Star Wars (1977)             |
|258    |509  |Contact (1997)               |
|100    |508  |Fargo (1996)                 |
|181    |507  |Return of the Jedi (1983)    |
|294    |485  |Liar Liar (1997)             |
|286    |481  |English Patient, The (1996)  |
|288    |478  |Scream (1996)                |
|1      |452  |Toy Story (1995)             |
|300    |431  |Air Force One (1997)         |
|121    |429  |Independence Day (ID4) (1996)|
+-------+-----+-----------------------------+
only showing top 10 rows



###Most connected superhero in the movies

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

spark = SparkSession.builder.appName("MostPopularSuperhero").getOrCreate()

schema = StructType([ \
                     StructField("id", IntegerType(), True), \
                     StructField("name", StringType(), True)])

names = spark.read.schema(schema).option("sep", " ").csv(infolder+"/../MarvelNames.txt")
lines = spark.read.text(infolder+"/../MarvelGraph.txt")

# Small tweak vs. what's shown in the video: we trim each line of whitespace as that could
# thr off the counts.
connections = lines.withColumn("id", func.split(func.trim(func.col("value")), " ")[0]) \
              .withColumn("connections", func.size(func.split(func.trim(func.col("value")), " ")) - 1) \
              .groupBy("id").agg(func.sum("connections").alias("connections"))
    
mostPopular     = connections.sort(func.col("connections").desc()).first()
mostPopularName = names.filter(func.col("id") == mostPopular[0]).select("name").first()
print(mostPopularName[0] + " is the most popular superhero with " + str(mostPopular[1]) + " co-appearances.")

spark.stop()

CAPTAIN AMERICA is the most popular superhero with 1933 co-appearances.


###Least connected superheros in the movies

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

spark = SparkSession.builder.appName("MostObscureSuperheroes").getOrCreate()

schema = StructType([ \
                     StructField("id", IntegerType(), True), \
                     StructField("name", StringType(), True)])

names = spark.read.schema(schema).option("sep", " ").csv(infolder+"/../MarvelNames.txt")
lines = spark.read.text(infolder+"/../MarvelGraph.txt")

# Small tweak vs. what's shown in the video: we trim whitespace from each line as this
# could throw the counts off by one.
connections = lines.withColumn("id", func.split(func.trim(func.col("value")), " ")[0]) \
              .withColumn("connections", func.size(func.split(func.trim(func.col("value")), " ")) - 1) \
              .groupBy("id").agg(func.sum("connections").alias("connections"))
    
minConnectionCount = connections.agg(func.min("connections")).first()[0]
minConnections     = connections.filter(func.col("connections") == minConnectionCount)
minConnectionsWithNames = minConnections.join(names, "id")

print("The following characters have only " + str(minConnectionCount) + " connection(s):")

minConnectionsWithNames.select("name").show()
spark.stop()

The following characters have only 0 connection(s):
+--------------------+
|                name|
+--------------------+
|        BERSERKER II|
|              BLARE/|
|MARVEL BOY II/MARTIN|
|MARVEL BOY/MARTIN BU|
|      GIURESCU, RADU|
|       CLUMSY FOULUP|
|              FENRIS|
|              RANDAK|
|           SHARKSKIN|
|     CALLAHAN, DANNY|
|         DEATHCHARGE|
|                RUNE|
|         SEA LEOPARD|
|         RED WOLF II|
|              ZANTOR|
|JOHNSON, LYNDON BAIN|
|          LUNATIK II|
|                KULL|
|GERVASE, LADY ALYSSA|
+--------------------+



##ML --- Linear Regression using Spark

In [16]:
##ML
from __future__ import print_function
from pyspark.ml.regression import LinearRegression
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors

# Create a SparkSession (Note, the config section is only for Windows!)
spark = SparkSession.builder.appName("LinearRegression").getOrCreate()

# Load up our data and convert it to the format MLLib expects.
inputLines = spark.sparkContext.textFile(infolder+"/../regression.txt")
data = inputLines.map(lambda x: x.split(",")).map(lambda x: (float(x[0]), Vectors.dense(float(x[1]))))

# Convert this RDD to a DataFrame
colNames = ["label", "features"]
df       = data.toDF(colNames)

# Note, there are lots of cases where you can avoid going from an RDD to a DataFrame.
# Perhaps you're importing data from a real database. Or you are using structured streaming
# to get your data.
# Let's split our data into training data and testing data
trainTest   = df.randomSplit([0.5, 0.5])
trainingDF  = trainTest[0]
testDF      = trainTest[1]

# Now create our linear regression model
lir   = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Train the model using our training data
model = lir.fit(trainingDF)

# Now see if we can predict values in our test data.
# Generate predictions using our linear regression model for all features in our
# test dataframe:
fullPredictions = model.transform(testDF).cache()

# Extract the predictions and the "known" correct labels.
predictions = fullPredictions.select("prediction").rdd.map(lambda x: x[0])
labels      = fullPredictions.select("label").rdd.map(lambda x: x[0])

# Zip them together
predictionAndLabel = predictions.zip(labels).collect()

# Stop the session
spark.stop()