In [0]:
# A1

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

data = sc.textFile("dbfs:/FileStore/MIE_1628_A2/integer.txt")

int_data = data.map(lambda x: int(x))

even_numbers = int_data.filter(lambda x: x % 2 == 0).count()
odd_numbers = int_data.filter(lambda x: x % 2 == 1).count()

print(f'even_numbers={even_numbers}')
print(f'odd_numbers={odd_numbers}')

even_numbers=514
odd_numbers=496


In [0]:
# A2

data = sc.textFile("dbfs:/FileStore/MIE_1628_A2/salary.txt")
res = data.map(lambda line: (line.split(' ')[0], int(line.split(' ')[1]))).reduceByKey(lambda x,y: x + y)
print(f'salary_sum={res.collect()}')

salary_sum=[('Sales', 3488491), ('Research', 3328284), ('Developer', 3221394), ('QA', 3360624), ('Marketing', 3158450)]


In [0]:
# A3
from operator import add
keywords = ['Shakespeare', 'why', 'Lord', 'Library', 'GUTENBERG', 'WILLIAM', 'COLLEGE', 'WORLD']

rdd = sc.textFile("dbfs:/FileStore/MIE_1628_A2/shakespeare_1.txt")
words = rdd.flatMap(lambda line: line.split(' '))

keyword_rdd = words.filter(lambda word : word in keywords)

keyword_pairs = keyword_rdd.map(lambda keyword: (keyword, 1))
counts = keyword_pairs.reduceByKey(add)

print(f'count of keywords={counts.collect()}')

count of keywords=[('Shakespeare', 22), ('GUTENBERG', 99), ('WILLIAM', 115), ('WORLD', 98), ('COLLEGE', 98), ('why', 91), ('Lord', 341), ('Library', 2)]


In [0]:
# A4
rdd = sc.textFile("dbfs:/FileStore/MIE_1628_A2/shakespeare_1.txt")
words = rdd.flatMap(lambda line: line.split(' ')).flatMap(lambda line: line.split(',')).flatMap(lambda line: line.split(':')).flatMap(lambda line: line.split('*')).filter(lambda word : len(word) > 0)
key_values = words.map(lambda word: (word, 1))
counts = key_values.reduceByKey(add)


df = counts.toDF(["word", "count"])
df.createOrReplaceTempView("word_counts")

top_15 = spark.sql("SELECT * FROM word_counts ORDER BY count DESC LIMIT 15")
print("top 15 words:")
top_15.show()

bottom_15 = spark.sql("SELECT * FROM word_counts ORDER BY count ASC LIMIT 15")
print("bottom 15 word:")
bottom_15.show()

top 15 words:
+----+-----+
|word|count|
+----+-----+
| the|11397|
| and| 8904|
|   I| 8706|
|  of| 7888|
|  to| 7453|
|   a| 5673|
|  my| 4914|
| you| 4683|
|  in| 4642|
| And| 3732|
|that| 3636|
|  is| 3584|
| not| 3347|
| his| 3242|
|with| 3183|
+----+-----+

bottom 15 word:
+--------------+-----+
|          word|count|
+--------------+-----+
|         START|    1|
|          2011|    1|
|     cooperate|    1|
|       NEITHER|    1|
|         read!|    1|
|       January|    1|
|       License|    1|
|        anyone|    1|
|         Title|    1|
|          1994|    1|
|         EBOOK|    1|
|  restrictions|    1|
|     September|    1|
|     Character|    1|
|WORKS--WILLIAM|    1|
+--------------+-----+



In [0]:
# B1
df = spark.read.csv("dbfs:/FileStore/MIE_1628_A2/movies.csv", inferSchema=True, header=True)
df.createOrReplaceTempView("movies")
top_20_movies = spark.sql("""
    SELECT movieId, AVG(rating) as avg_rating 
    FROM movies 
    GROUP BY movieId 
    ORDER BY avg_rating DESC 
    LIMIT 20""")
top_20_movies.show()

top_15_users = spark.sql("""
    SELECT userId, AVG(rating)
    FROM movies
    GROUP BY userId
    ORDER BY avg(rating) DESC
    LIMIT 15""")
top_15_users.show()

+-------+------------------+
|movieId|        avg_rating|
+-------+------------------+
|     32|2.9166666666666665|
|     90|            2.8125|
|     30|               2.5|
|     94| 2.473684210526316|
|     23| 2.466666666666667|
|     49|            2.4375|
|     29|               2.4|
|     18|               2.4|
|     52| 2.357142857142857|
|     53|              2.25|
|     62|              2.25|
|     92|2.2142857142857144|
|     46|               2.2|
|     68|2.1578947368421053|
|     87|2.1333333333333333|
|      2|2.1052631578947367|
|     69| 2.076923076923077|
|     27| 2.066666666666667|
|     88|2.0555555555555554|
|     22|              2.05|
+-------+------------------+

+------+------------------+
|userId|       avg(rating)|
+------+------------------+
|    11|2.2857142857142856|
|    26| 2.204081632653061|
|    22|2.1607142857142856|
|    23|2.1346153846153846|
|     2|2.0652173913043477|
|    17|1.9565217391304348|
|     8|1.8979591836734695|
|    24|1.8846153846153

In [0]:
# B2, B3
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

df = spark.read.csv("dbfs:/FileStore/MIE_1628_A2/movies.csv", inferSchema=True, header=True)
(train, test) = df.randomSplit([0.8, 0.2])

als = ALS(userCol='userId', itemCol='movieId', ratingCol='rating', coldStartStrategy='drop')

model = als.fit(train)
prediction = model.transform(test)

rmse_evaluator = RegressionEvaluator(
    metricName='rmse',
    labelCol='rating',
    predictionCol='prediction'
)

mae_evaluator = RegressionEvaluator(
    metricName='mae',
    labelCol='rating',
    predictionCol='prediction'
)

rmse = rmse_evaluator.evaluate(prediction)
mae = mae_evaluator.evaluate(prediction)
print("root mean square error for [80, 20] split=" + str(rmse))
print("mean absolute error for [80, 20] split=" + str(mae))
print("prediction for [80, 20] split:")
prediction.show()


(train, test) = df.randomSplit([0.6, 0.4])
model = als.fit(train)
prediction = model.transform(test)
rmse = rmse_evaluator.evaluate(prediction)
mae = mae_evaluator.evaluate(prediction)
print("root mean square error for [60, 40] split=" + str(rmse))
print("mean absolute error for [60, 40] split=" + str(mae))
print("prediction for [60, 40] split:")
prediction.show()

root mean square error for [80, 20] split=0.9777890272123972
mean absolute error for [80, 20] split=0.6860865863743207
prediction for [80, 20] split:
+-------+------+------+----------+
|movieId|rating|userId|prediction|
+-------+------+------+----------+
|      1|     1|    28| 0.9467137|
|     19|     3|    28| 2.2585254|
|     36|     1|    28| 1.4560488|
|     38|     2|    28| 1.6508074|
|     50|     1|    28| 1.3009136|
|     54|     1|    28| 0.9556073|
|     62|     3|    28| 1.2522787|
|     65|     1|    28|0.74010944|
|     88|     2|    28| 1.3611788|
|     98|     1|    28| 1.2618763|
|     99|     1|    28| 1.1259569|
|      1|     1|    26|  1.637441|
|      3|     1|    26| 0.8077318|
|      7|     5|    26| 1.6972333|
|      9|     1|    27| 0.6673592|
|     13|     3|    26| 1.6228696|
|     16|     1|    26| 1.2412927|
|     24|     5|    26| 2.4115663|
|     33|     3|    27| 1.1142951|
|     44|     1|    26| 1.2683893|
+-------+------+------+----------+
only showi

In [0]:
#B3
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
(train, test) = df.randomSplit([0.8, 0.2])

parameters=ParamGridBuilder()\
    .addGrid(als.rank,[10, 50, 100])\
    .addGrid(als.maxIter, [10, 15, 20])\
    .addGrid(als.regParam, [0.01, 0.05, 0.1])\
    .build()

eval = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

trainvs = TrainValidationSplit(estimator=als, estimatorParamMaps=parameters, evaluator=eval)
cv = CrossValidator(estimator=als, estimatorParamMaps=parameters, evaluator=eval, numFolds=3)

model = trainvs.fit(train)
predictions = model.transform(test)
rmse = eval.evaluate(predictions)

print("best ranks=", model.bestModel._java_obj.parent().getRank())
print("best max iter=", model.bestModel._java_obj.parent().getMaxIter())
print("best regParam=", model.bestModel._java_obj.parent().getRegParam())
print("rmse=", rmse)



best ranks= 50
best max iter= 20
best regParam= 0.1
rmse= 0.9681408715390586


In [0]:
# B5

best_model = model.bestModel
user_df = spark.createDataFrame([(10,), (14,)], ['userId'])
recommendations = best_model.recommendForUserSubset(user_df, 15)
recommendations.show(truncate=False)



+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                                                                                         |
+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|10    |[{40, 3.3974857}, {92, 3.3556578}, {2, 3.0700889}, {89, 2.9030402}, {12, 2.8274043}, {62, 2.727406}, {25, 2.7179892}, {49, 2.647411}, {42, 2.504928}, {39, 2.3664346}, {4, 2.2915287}, {82, 2.2527263}, {0, 2.2284079}, {9