In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
import collections
from dataclasses import dataclass, field 

sc = SparkContext.getOrCreate()

# RDD: mapValue and reduceByKey function

In [None]:
lines = sc.textFile("s3://370784835428-datalake/udemy/u.data")

In [None]:
ratings = lines.map(lambda x: x.split()[2])

In [None]:
result = ratings.countByValue()

In [None]:
sortedResults = collections.OrderedDict(sorted(result.items()))
for key, value in sortedResults.items():
    print("%s %i" % (key, value))

In [None]:
def parseLine(line):
    fields = line.split(',')
    age = int(fields[2])
    numFriends = int(fields[3])
    return (age, numFriends)

lines = sc.textFile("s3://370784835428-datalake/udemy/SparkCourse/SparkCourse/fakefriends.csv")
rdd = lines.map(parseLine) 

totalsByAge = rdd.mapValues(lambda x:(x, 1)).reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1])) 

totalsByAgeCollected = totalsByAge.collect() 

averagesByAge = totalsByAge.mapValues(lambda x:x[0]/x[1]) 

results = averagesByAge.collect()
  
for result in results:
   print(result)

# RDD: filter function

In [None]:
def parseLine(line):
    fields = line.split(',')
    stationID = fields[0]
    entryType = fields[2]
    temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
    return (stationID, entryType, temperature)

lines = sc.textFile("s3://thanhtt-0000-datalake/udemy/SparkCourse/1800.csv")
parsedLines = lines.map(parseLine)
minTemps = parsedLines.filter(lambda x: "TMIN" in x[1])
stationTemps = minTemps.map(lambda x: (x[0], x[2]))
minTemps = stationTemps.reduceByKey(lambda x, y: min(x,y))
results = minTemps.collect();

for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))


# RDD: flatMap function

In [None]:
import re

def normalizeWords(text):
    return re.compile(r'\W+',re.UNICODE).split(text.lower())

input = sc.textFile("s3://thanhtt-0000-datalake/udemy/SparkCourse/book.txt")
words = input.flatMap(normalizeWords) # the text split into many rows

wordCounts = words.map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y)
wordCountsSorted = wordCounts.map(lambda x: (x[0],x[1])).map(lambda x:(x[1],x[0]) ).sortByKey()
# for item in wordCountsSorted.collect():# comment because the list is too long
    # print(item)
print("Result is commented because it's too long")
results = wordCountsSorted.collect()

# for result in results:
#     count = str(result[0]) 
#     word = result[1].encode("ascii","ignore")
    
#     # comment because the list is too long
#     if(word):
#         print(cleanWord,count)


# **DataFrame: Spark SQL**

In [None]:
from pyspark.sql import Row

def mapper(line):
    fields = line.split(',')
    return Row(ID = int(fields[0]), name = str(fields[1].encode("utf-8")),
               age = int(fields[2]), numFriends = int(fields[3]))

lines = sc.textFile("s3://thanhtt-0000-datalake/udemy/SparkCourse/fakefriends.csv")
people = lines.map(mapper)

# Infer the schema, and register the DataFrame as table.
schemaPeople = spark.createDataFrame(people).cache()
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been register as a table
teenagers = spark.sql("SELECT * FROM people WHERE age >=13 and age <=19")

# The results of SQL queries are RDDs and support all the normal RDD operations
for teen in teenagers.collect():
    print(teen)
    
# We can also use functions instead of SQL queries
schemaPeople.groupBy("age").count().orderBy("age").show()


# **DataFrame: Infer Schema and common function to work with**

In [None]:
from pyspark.sql import functions as func

people = spark.read.option("header","true").option("inferSchema","true")\
            .csv("s3://thanhtt-0000-datalake/udemy/SparkCourse/fakefriends-header.csv")

print("here is our inferred schema")
people.printSchema()

print("display the name column")
people.select("name").show()

print("filter out anyone over 21")
people.filter(people.age < 21).show()

print("group by age")
people.groupBy("age").count().show()

print("make everyone 10 year older")
people.select(people.name, people.age +10).show()

print("sorted")
friendByAge = people.select("age","friends")
friendByAge.groupBy("age").avg("friends").sort("age").show()

print("formatted more nicely")
friendByAge.groupBy("age").agg(func.round(func.avg("friends"),2)).sort("age").show()

print("with a custom column name")
friendByAge.groupBy("age").agg(func.round(func.avg("friends"),2).alias("friends_avg")).sort("age").show()


# **Word count with DataFrame(split unstructure text into multi row dataframe)**

In [None]:
# read each line of my book into a dataframe
inputDF = spark.read.text("s3://thanhtt-0000-datalake/udemy/SparkCourse/book.txt")
inputDF.show()

# split using a regular expression that extract words
words = inputDF.select(func.explode(func.split(inputDF.value,"\\W+")).alias("word"))
words.filter(words.word != "")

words.show()

# **DataFrame: StructType, StructField, IntegerType, FloatType**

In [None]:
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType

#create schema when reading customer order
customerOrderSchema = StructType([\
                                 StructField("cust_id", IntegerType(), True),
                                 StructField("item_id", IntegerType(), True),
                                 StructField("amount_spent", FloatType(), True)
                                 ])

#load up the data into spark dataset
customerDF = spark.read.schema(customerOrderSchema).csv("s3://370784835428-datalake/udemy/SparkCourse/SparkCourse/customer-orders.csv")

totalByCustomer = customerDF.groupBy("cust_id").agg(func.round(func.sum("amount_spent"),2).alias("total_spent"))

totalByCustomerSorted = totalByCustomer.sort("total_spent")

totalByCustomer.show(totalByCustomerSorted.count())



# **DataFrame: OrderBy function**

In [None]:
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, LongType

#create schema when reading customer order
schema = StructType([\
             StructField("userID", IntegerType(), True),
             StructField("movieID", IntegerType(), True),
             StructField("rating", IntegerType(), True),
             StructField("timestamp", LongType(), True)
             ])

#load up movie data as dataframe
moviesDF = spark.read.option("sep","\t").schema(schema).csv("s3://370784835428-datalake/udemy/u.data")

# some SQL-style magic to sort all the movies by popularity in one line
topMovieIDs = moviesDF.groupBy("movieID").count().orderBy(func.desc("count"))

# grab the top 10
topMovieIDs.show(10)


# **DataFrame: broadcast function**

In [None]:
# install boto3 pythob library for download file from S3 
sc.install_pypi_package("boto3")


In [None]:
# move current dir to /tmp and check all directory
os.chdir("/tmp")
cwd = os.getcwd()  # Get the current working directory (cwd)
files = os.listdir(cwd)  # Get all the files in that directory
print("Files in %r: %s" % (cwd, files))

In [14]:
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, LongType
import codecs 
import os 
import boto3

s3client=boto3.client("s3")
s3client.download_file("370784835428-datalake","udemy/u.item","/tmp/u.item")

def loadMovieNames():
    movieNames = {} 
    
    with codecs.open("/tmp/u.item","r",encoding='ISO-8859-1',errors='ignore') as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

nameDict = sc.broadcast(loadMovieNames())

# create schema when reading u.data
schema = StructType([\
             StructField("userID", IntegerType(), True),
             StructField("movieID", IntegerType(), True),
             StructField("rating", IntegerType(), True),
             StructField("timestamp", LongType(), True)
             ])

#load up movie data as dataframe
moviesDF = spark.read.option("sep","\t").schema(schema).csv("s3://370784835428-datalake/udemy/u.data")

movieCounts = moviesDF.groupBy("movieID").count()

#create user-defined function to look up movie name from our broadcasted ditionary
def lookupName(movieID):
    return nameDict.value[movieID]

lookupNameUDF = func.udf(lookupName)

# add a movie title column using new udf 
moviesWithNames = movieCounts.withColumn("movieTitle",lookupNameUDF(func.col("movieID")))

# sort the results 
sortedMoviesWithNames = moviesWithNames.orderBy(func.desc("count"))

# grab the top 10
sortedMoviesWithNames.show(10,False)


    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----+-----------------------------+
|movieID|count|movieTitle                   |
+-------+-----+-----------------------------+
|50     |583  |Star Wars (1977)             |
|258    |509  |Contact (1997)               |
|100    |508  |Fargo (1996)                 |
|181    |507  |Return of the Jedi (1983)    |
|294    |485  |Liar Liar (1997)             |
|286    |481  |English Patient, The (1996)  |
|288    |478  |Scream (1996)                |
|1      |452  |Toy Story (1995)             |
|300    |431  |Air Force One (1997)         |
|121    |429  |Independence Day (ID4) (1996)|
+-------+-----+-----------------------------+
only showing top 10 rows