In [1]:
import findspark
findspark.init()
findspark.find()


'C:\\Program Files\\spark-3.3.1-bin-hadoop3'

# Word Count (RDD)

In [2]:
import re
from pyspark import SparkConf, SparkContext

def normalizeWords(text):
    return re.compile(r'\W+', re.UNICODE).split(text.lower())

conf = SparkConf().setMaster("local").setAppName("WordCount")
sc = SparkContext(conf = conf)

input = sc.textFile("C:/Users/Richard/Data Engineering/sparkcourse/book.txt")
words = input.flatMap(normalizeWords)

wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
wordCountsSorted = wordCounts.map(lambda x: (x[1], x[0])).sortByKey()
results = wordCountsSorted.collect()

for result in results:
    count = str(result[0])
    word = result[1].encode('ascii', 'ignore')
    if (word):
        print(word.decode() + ":\t\t" + count)


achieving:		1
contents:		1
preparation:		1
skillset:		1
determination:		1
confidence:		1
strike:		1
blame:		1
devoted:		1
commuted:		1
complaint:		1
rewarded:		1
role:		1
marriage:		1
combat:		1
secondary:		1
ultimatum:		1
weeks:		1
walked:		1
matches:		1
nor:		1
requirement:		1
43:		1
broke:		1
gloat:		1
heart:		1
attack:		1
65:		1
retire:		1
smarts:		1
tenacity:		1
discarding:		1
bold:		1
psyche:		1
rebel:		1
magnitude:		1
justify:		1
surprising:		1
fell:		1
laid:		1
downsized:		1
imagined:		1
pursuing:		1
roof:		1
starving:		1
foreclosed:		1
returning:		1
prototyped:		1
proven:		1
quits:		1
sba:		1
tenure:		1
secure:		1
28:		1
moderately:		1
americans:		1
representing:		1
14:		1
fringe:		1
internal:		1
religious:		1
cults:		1
brainwashing:		1
beliefs:		1
questioning:		1
instill:		1
youth:		1
grew:		1
teachers:		1
grades:		1
graduated:		1
absorbed:		1
culture:		1
promotes:		1
indoctrinated:		1
landed:		1
children:		1
terrifying:		1
barely:		1
mouths:		1
fulfill:		1
thrust:		1
foam:		

# Total Amount Spent By Each Customer (RDD)

In [None]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("CustomerSpending")
sc = SparkContext(conf = conf)

def parseLine(line):
    fields = line.split(',')
    custID = int(fields[0])
    amt_spent = float(fields[2])
    return (custID, amt_spent)

lines = sc.textFile("C:/Users/Richard/Data Engineering/SparkCourse/customer-orders.csv")
parsedLines = lines.map(parseLine)

cust_spent = parsedLines.reduceByKey(lambda a,b: a + b)

# sort the data by values based on column 2 in reversed order "-x"
cust_spent_Sorted = cust_spent.sortBy(lambda x: -x[1])

# Alternatively, we can also flip the columns and sortByKey() but in ascending order
#cust_spent_Sorted = cust_spent.map(lambda x: (x[1], x[0])).sortByKey()

for i in cust_spent_Sorted.collect():
    cust = i[0]
    spent = i[1]
    print(f"{cust}: ${spent:.2f}")


# Popular Movies (RDD)

In [2]:
from pyspark import SparkConf, SparkContext

def loadMovieNames():
    movieNames = {}
    with open("ml-100k/u.ITEM") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1] # assign value to the key
    return movieNames

conf = SparkConf().setMaster("local").setAppName("PopularMovies")
sc = SparkContext(conf = conf)

nameDict = sc.broadcast(loadMovieNames())

lines = sc.textFile("C:/Users/Richard/Data Engineering/SparkCourse/ml-100k/u.data")
movies = lines.map(lambda x: (int(x.split()[1]), 1))
movieCounts = movies.reduceByKey(lambda x, y: x + y)

# sort the data by values based on column 2 in reversed order "-x"
sortedMovies = movieCounts.sortBy(lambda x: -x[1])

sortedMoviesWithNames = sortedMovies.map(lambda A : (nameDict.value[A[0]], A[1]))

results = sortedMoviesWithNames.collect()

for result in results:
    print(result)


('Star Wars (1977)', 583)
('Contact (1997)', 509)
('Fargo (1996)', 508)
('Return of the Jedi (1983)', 507)
('Liar Liar (1997)', 485)
('English Patient, The (1996)', 481)
('Scream (1996)', 478)
('Toy Story (1995)', 452)
('Air Force One (1997)', 431)
('Independence Day (ID4) (1996)', 429)
('Raiders of the Lost Ark (1981)', 420)
('Godfather, The (1972)', 413)
('Pulp Fiction (1994)', 394)
('Twelve Monkeys (1995)', 392)
('Silence of the Lambs, The (1991)', 390)
('Jerry Maguire (1996)', 384)
('Rock, The (1996)', 378)
('Empire Strikes Back, The (1980)', 367)
('Star Trek: First Contact (1996)', 365)
('Back to the Future (1985)', 350)
('Titanic (1997)', 350)
('Mission: Impossible (1996)', 344)
('Fugitive, The (1993)', 336)
('Indiana Jones and the Last Crusade (1989)', 331)
('Willy Wonka and the Chocolate Factory (1971)', 326)
('Princess Bride, The (1987)', 324)
('Forrest Gump (1994)', 321)
('Saint, The (1997)', 316)
('Monty Python and the Holy Grail (1974)', 316)
('Full Monty, The (1997)', 315)

# Spark SQL

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

import collections

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

def mapper(line):
    fields = line.split(',')
    return Row(ID=int(fields[0]), name=str(fields[1]), age=int(fields[2]), numFriends=int(fields[3]))


lines = spark.sparkContext.textFile("fakefriends.csv")
people = lines.map(mapper)

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people).cache()
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are RDDs and support all the normal RDD operations.
for teen in teenagers.collect():
    print(teen)

# We can also use functions instead of SQL queries:
schemaPeople.groupBy("age").count().orderBy("age").show()

spark.stop()



Row(ID=21, name='Miles', age=19, numFriends=268)
Row(ID=52, name='Beverly', age=19, numFriends=269)
Row(ID=54, name='Brunt', age=19, numFriends=5)
Row(ID=106, name='Beverly', age=18, numFriends=499)
Row(ID=115, name='Dukat', age=18, numFriends=397)
Row(ID=133, name='Quark', age=19, numFriends=265)
Row(ID=136, name='Will', age=19, numFriends=335)
Row(ID=225, name='Elim', age=19, numFriends=106)
Row(ID=304, name='Will', age=19, numFriends=404)
Row(ID=341, name='Data', age=18, numFriends=326)
Row(ID=366, name='Keiko', age=19, numFriends=119)
Row(ID=373, name='Quark', age=19, numFriends=272)
Row(ID=377, name='Beverly', age=18, numFriends=418)
Row(ID=404, name='Kasidy', age=18, numFriends=24)
Row(ID=409, name='Nog', age=19, numFriends=267)
Row(ID=439, name='Data', age=18, numFriends=417)
Row(ID=444, name='Keiko', age=18, numFriends=472)
Row(ID=492, name='Dukat', age=19, numFriends=36)
Row(ID=494, name='Kasidy', age=18, numFriends=194)
+---+-----+
|age|count|
+---+-----+
| 18|    8|
| 19|   

# Popular Movies (with DataFrame)

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions

def loadMovieNames():
    movieNames = {}
    with open("ml-100k/u.ITEM") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

# Create a SparkSession
spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

# Load up our movie ID -> name dictionary
nameDict = loadMovieNames()

# Get the raw data
lines = spark.sparkContext.textFile("C:/Users/Richard/Data Engineering/SparkCourse/ml-100k/u.data")
# Convert it to a RDD of Row objects
movies = lines.map(lambda x: Row(movieID =int(x.split()[1])))
# Convert that to a DataFrame
movieDataset = spark.createDataFrame(movies)

# Some SQL-style magic to sort all movies by popularity in one line!
topMovieIDs = movieDataset.groupBy("movieID").count().orderBy("count", ascending=False).cache()

topMovieIDs.show()

# Grab the top 10
top10 = topMovieIDs.take(10)

# Print the results
print("\n")
for result in top10:
    # Each row has movieID, count as above.
    print("%s: %d" % (nameDict[result[0]], result[1]))

# Stop the session
spark.stop()


+-------+-----+
|movieID|count|
+-------+-----+
|     50|  583|
|    258|  509|
|    100|  508|
|    181|  507|
|    294|  485|
|    286|  481|
|    288|  478|
|      1|  452|
|    300|  431|
|    121|  429|
|    174|  420|
|    127|  413|
|     56|  394|
|      7|  392|
|     98|  390|
|    237|  384|
|    117|  378|
|    172|  367|
|    222|  365|
|    313|  350|
+-------+-----+
only showing top 20 rows



Star Wars (1977): 583
Contact (1997): 509
Fargo (1996): 508
Return of the Jedi (1983): 507
Liar Liar (1997): 485
English Patient, The (1996): 481
Scream (1996): 478
Toy Story (1995): 452
Air Force One (1997): 431
Independence Day (ID4) (1996): 429


#  Spark-MLlib with Linear-Regression (on DataFrame)

In [4]:
from __future__ import print_function

from pyspark.ml.regression import LinearRegression

from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors

if __name__ == "__main__":

    # Create a SparkSession (Note, the config section is only for Windows!)
    spark = SparkSession.builder.appName("LinearRegression").getOrCreate()

    # Load up our data and convert it to the format MLLib expects.
    inputLines = spark.sparkContext.textFile("regression.txt")
    data = inputLines.map(lambda x: x.split(",")).map(lambda x: (float(x[0]), Vectors.dense(float(x[1]))))

    # Convert this RDD to a DataFrame
    colNames = ["label", "features"]
    df = data.toDF(colNames)

    # Note, there are lots of cases where you can avoid going from an RDD to a DataFrame.
    # Perhaps you're importing data from a real database. Or you are using structured streaming
    # to get your data.

    # Let's split our data into training data and testing data
    trainTest = df.randomSplit([0.5, 0.5])
    trainingDF = trainTest[0]
    testDF = trainTest[1]

    # Now create our linear regression model
    lir = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

    # Train the model using our training data
    model = lir.fit(trainingDF)

    # Now see if we can predict values in our test data.
    # Generate predictions using our linear regression model for all features in our
    # test dataframe:
    fullPredictions = model.transform(testDF).cache()

    # Extract the predictions and the "known" correct labels.
    predictions = fullPredictions.select("prediction").rdd.map(lambda x: x[0])
    labels = fullPredictions.select("label").rdd.map(lambda x: x[0])

    # Zip them together
    predictionAndLabel = predictions.zip(labels).collect()

    # Print out the predicted and actual values for each point
    for prediction in predictionAndLabel:
        print(prediction)


    # Stop the session
    spark.stop()


(-2.6365436897321355, -3.74)
(-1.8116310162420026, -2.58)
(-1.6857968796079148, -2.54)
(-1.657833738133673, -2.29)
(-1.5879258844480686, -2.26)
(-1.5459811722367058, -2.17)
(-1.392183894128376, -2.09)
(-1.4411193917082992, -2.07)
(-1.4271378209711783, -2.0)
(-1.3712115380226946, -1.94)
(-1.2943128989685297, -1.91)
(-1.3152852550742111, -1.91)
(-1.1754695477030022, -1.77)
(-1.1754695477030022, -1.74)
(-1.1614879769658812, -1.66)
(-1.1544971915973208, -1.65)
(-1.3013036843370902, -1.64)
(-1.2174142599143647, -1.61)
(-1.1544971915973208, -1.6)
(-1.1055616940173978, -1.57)
(-1.189451118440123, -1.53)
(-1.0356538403317932, -1.47)
(-1.1265340501230792, -1.42)
(-0.937782845171947, -1.4)
(-1.000699913488991, -1.36)
(-1.0496354110689141, -1.33)
(-0.8469026353806611, -1.27)
(-0.8608842061177819, -1.26)
(-0.8538934207492216, -1.25)
(-0.8469026353806611, -1.23)
(-0.8818565622234633, -1.17)
(-0.8888473475920238, -1.16)
(-0.7140777133780126, -1.11)
(-0.8748657768549029, -1.11)
(-0.6581514304295291, 