In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master("local").appName("Linear Regression Model").config("spark.executor.memory", "1gb").getOrCreate()

In [3]:
sc = spark.sparkContext

In [4]:
df = spark.read.csv("ratings.csv", header=True)

In [5]:
df.take(5)

[Row(user_id=u'1', book_id=u'258', rating=u'5'),
 Row(user_id=u'2', book_id=u'4081', rating=u'4'),
 Row(user_id=u'2', book_id=u'260', rating=u'5'),
 Row(user_id=u'2', book_id=u'9296', rating=u'5'),
 Row(user_id=u'2', book_id=u'2318', rating=u'3')]

In [6]:
df.describe().show()

+-------+------------------+------------------+------------------+
|summary|           user_id|           book_id|            rating|
+-------+------------------+------------------+------------------+
|  count|           5976479|           5976479|           5976479|
|   mean|26224.457362102334|2006.4773984816143|3.9198655261735214|
| stddev|15413.234093272344| 2468.499463379664|0.9910868103700352|
|    min|                 1|                 1|                 1|
|    max|              9999|              9999|                 5|
+-------+------------------+------------------+------------------+



In [7]:
df.columns

['user_id', 'book_id', 'rating']

In [8]:
from pyspark.sql.types import *

In [9]:
from pyspark.sql.functions import col

In [10]:
df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- rating: string (nullable = true)



In [11]:
df = df.withColumn("user_id", df["user_id"].cast(IntegerType()))
df = df.withColumn("book_id", df['book_id'].cast(IntegerType()))
df = df.withColumn("rating", df['rating'].cast(FloatType()))

In [12]:
df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- book_id: integer (nullable = true)
 |-- rating: float (nullable = true)



In [13]:
df.describe().show()

+-------+------------------+------------------+------------------+
|summary|           user_id|           book_id|            rating|
+-------+------------------+------------------+------------------+
|  count|           5976479|           5976479|           5976479|
|   mean|26224.457362102334|2006.4773984816143|3.9198655261735214|
| stddev|15413.234093272344| 2468.499463379664|0.9910868103700352|
|    min|                 1|                 1|               1.0|
|    max|             53424|             10000|               5.0|
+-------+------------------+------------------+------------------+



In [14]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [15]:
(training, test) = df.randomSplit([0.8,0.2])

In [18]:
als = ALS(maxIter=5, regParam=0.01, userCol="user_id", itemCol="book_id", ratingCol="rating",
          coldStartStrategy="drop")

In [19]:
model = als.fit(training)

In [20]:
predictions = model.transform(test)

In [21]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

In [22]:
print("Root Mean Squared Error is" + str(rmse))

Root Mean Squared Error is0.838717109316


In [23]:
userRecs = model.recommendForAllUsers(10)

In [57]:
userRecs.show(5)

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|    148|[[9214,5.2479763]...|
|    463|[[8946,5.411862],...|
|    471|[[8372,5.459212],...|
|    496|[[6819,6.7116385]...|
|    833|[[8372,5.612702],...|
+-------+--------------------+
only showing top 5 rows



In [56]:
rec148 = userRecs.first()

AttributeError: select

In [43]:
book_ids = []
for x in rec148.recommendations:
    book_ids.append(x[0])

In [47]:
type(book_ids[0])

int

In [32]:
books = spark.read.csv("books.csv", header=True)

In [50]:
books.printSchema

<bound method DataFrame.printSchema of DataFrame[book_id: int, goodreads_book_id: string, best_book_id: string, work_id: string, books_count: string, isbn: string, isbn13: string, authors: string, original_publication_year: string, original_title: string, title: string, language_code: string, average_rating: string, ratings_count: string, work_ratings_count: string, work_text_reviews_count: string, ratings_1: string, ratings_2: string, ratings_3: string, ratings_4: string, ratings_5: string, image_url: string, small_image_url: string]>

In [49]:
books = books.withColumn("book_id", books["book_id"].cast(IntegerType()))

In [39]:
from pyspark.sql import DataFrame

In [41]:
from pyspark.sql.functions import array, lit

In [65]:
stupid_set = set(range(2,100))

In [74]:
books.filter(books.book_id.isin(book_ids)).select('original_title').collect()

[Row(original_title=None),
 Row(original_title=u'The Creative License: Giving Yourself Permission to Be The Artist You Truly Are'),
 Row(original_title=u"Founders at Work: Stories of Startups' Early Days"),
 Row(original_title=u"Grain Brain: The Surprising Truth about Wheat, Carbs, and Sugar--Your Brain's Silent Killers"),
 Row(original_title=u'The Autobiography of Martin Luther King, Jr.'),
 Row(original_title=u'The Orenda'),
 Row(original_title=u'Colonel Roosevelt'),
 Row(original_title=u'Hafalan Shalat Delisa'),
 Row(original_title=u'Call Me by Your Name'),
 Row(original_title=u'Humans of New York: Stories')]

In [None]:
sqlContext.sql('SELECT original_title FROM books WHERE book_id == ').show(5)