In [1]:
from pyspark.sql import SparkSession, Row, Column as Col, functions as F
from pyspark.sql.types import BooleanType
from pyspark import SparkConf, SparkContext

In [2]:
import os
os.environ['SPARK_HOME'] = '/opt/spark'
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3'

In [3]:
from flask import Flask, request
from flask_restful import Api, Resource

In [4]:
app = Flask(__name__)
api = Api(app)

In [20]:
@app.route('/')
def hello_world():
    return 'Hello, World!'

In [5]:
spark_conf = SparkConf().setMaster('local').setAppName('Notflix')

In [6]:
sc = SparkContext.getOrCreate(spark_conf)
print(sc)

<SparkContext master=local appName=Notflix>


In [7]:
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()

In [8]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f05ac59efd0>


In [9]:
infoDF = spark.read.csv('hdfs:///home/test.csv', header=True, inferSchema=True)

In [11]:
infoDF.show()

+---+-----+------+-----+
|vid|liked|viewed| list|
+---+-----+------+-----+
|  0| true|  true|false|
|  1| true|  true| true|
|  2|false|  true|false|
|  3|false|  true| true|
|  4|false| false| true|
+---+-----+------+-----+



In [72]:
infoDF.printSchema()

root
 |-- vid: integer (nullable = true)
 |-- liked: boolean (nullable = true)
 |-- viewed: boolean (nullable = true)
 |-- list: boolean (nullable = true)



In [10]:
def saveProfileInfo(result, profile_path):
    result.coalesce(1).write.format('csv').save(f'{profile_path}/info', header=True, mode='overwrite')
    os.system(f'hdfs dfs -rm {profile_path}/info.csv')
    os.system(f'hdfs dfs -cp {profile_path}/info/p* {profile_path}/info.csv')

In [11]:
def likeVideo(uid, profile, vid):
    profile_path = f'/home/users/{uid}/profiles/{profile}'
    # Get Dataframe
    infoDF = spark.read.csv(f'{profile_path}/info.csv', header=True, inferSchema=True)
    infoDF.show()
    infoDF.createOrReplaceTempView('info')
    # Get vid row
    sql_results = spark.sql(f'SELECT * FROM info WHERE vid = {vid}')
    if len(sql_results.take(1)) == 0: # No existing entry for the vid
        columns = ['vid', 'liked', 'viewed', 'list']
        sql_results = spark.createDataFrame([(vid, True, False, False)], columns)
    else:
        infoDF = infoDF.rdd.filter(lambda x: x.vid != vid).toDF()
        sql_results = sql_results.withColumn('liked', F.regexp_replace('liked', 'false', 'true'))
        sql_results = sql_results.withColumn('liked', F.col('liked').cast('boolean'))
    result = infoDF.union(sql_results)
    result.show()
    saveProfileInfo(result, profile_path)

In [12]:
likeVideo(0, 'test', 5)

+---+-----+------+-----+
|vid|liked|viewed| list|
+---+-----+------+-----+
|  1| true|  true| true|
|  2|false|  true|false|
|  3|false|  true| true|
|  0| true|  true|false|
|  5| true| false|false|
|  4|false|  true| true|
+---+-----+------+-----+

+---+-----+------+-----+
|vid|liked|viewed| list|
+---+-----+------+-----+
|  1| true|  true| true|
|  2|false|  true|false|
|  3|false|  true| true|
|  0| true|  true|false|
|  4|false|  true| true|
|  5| true| false|false|
+---+-----+------+-----+



In [13]:
def dislikeVideo(uid, profile, vid):
    profile_path = f'/home/users/{uid}/profiles/{profile}'
    # Get Dataframe
    infoDF = spark.read.csv(f'{profile_path}/info.csv', header=True, inferSchema=True)
    infoDF.show()
    infoDF.createOrReplaceTempView('info')
    # Get vid row
    sql_results = spark.sql(f'SELECT * FROM info WHERE vid = {vid}')
    if len(sql_results.take(1)) == 0: # No existing entry for the vid
        columns = ['vid', 'liked', 'viewed', 'list']
        sql_results = spark.createDataFrame([(vid, False, False, False)], columns)
    else:
        infoDF = infoDF.rdd.filter(lambda x: x.vid != vid).toDF()
        sql_results = sql_results.withColumn('liked', F.regexp_replace('liked', 'true', 'false'))
        sql_results = sql_results.withColumn('liked', F.col('liked').cast('boolean'))
    result = infoDF.union(sql_results)
    result.show()
    saveProfileInfo(result, profile_path)

In [229]:
dislikeVideo(0, 'test', 5)

+---+-----+------+-----+
|vid|liked|viewed| list|
+---+-----+------+-----+
|  1| true|  true| true|
|  2|false|  true|false|
|  3|false|  true| true|
|  0| true|  true|false|
|  4|false|  true| true|
|  5| true| false|false|
+---+-----+------+-----+

+---+-----+------+-----+
|vid|liked|viewed| list|
+---+-----+------+-----+
|  1| true|  true| true|
|  2|false|  true|false|
|  3|false|  true| true|
|  0| true|  true|false|
|  4|false|  true| true|
|  5|false| false|false|
+---+-----+------+-----+



In [14]:
def getLikedVideos(uid, profile):
    profile_path = f'/home/users/{uid}/profiles/{profile}'
    # Get Dataframe
    infoDF = spark.read.csv(f'{profile_path}/info.csv', header=True, inferSchema=True)
    infoDF.show()
    infoDF.createOrReplaceTempView('info')
    # Get vid row
    sql_results = spark.sql(f'SELECT * FROM info WHERE liked = true')
    sql_results.show()
    vids = [int(row.vid) for row in sql_results.collect()]
    return vids

In [216]:
vids = getLikedVideos(0, 'test')
print(vids)

+---+-----+------+-----+
|vid|liked|viewed| list|
+---+-----+------+-----+
|  1| true|  true| true|
|  2|false|  true|false|
|  3|false|  true| true|
|  4|false| false| true|
|  0| true|  true|false|
|  5|false| false|false|
+---+-----+------+-----+

+---+-----+------+-----+
|vid|liked|viewed| list|
+---+-----+------+-----+
|  1| true|  true| true|
|  0| true|  true|false|
+---+-----+------+-----+

[1, 0]


In [15]:
def viewVideo(uid, profile, vid):
    profile_path = f'/home/users/{uid}/profiles/{profile}'
    # Get Dataframe
    infoDF = spark.read.csv(f'{profile_path}/info.csv', header=True, inferSchema=True)
    infoDF.show()
    infoDF.createOrReplaceTempView('info')
    # Get vid row
    sql_results = spark.sql(f'SELECT * FROM info WHERE vid = {vid}')
    if len(sql_results.take(1)) == 0: # No existing entry for the vid
        columns = ['vid', 'liked', 'viewed', 'list']
        sql_results = spark.createDataFrame([(vid, False, True, False)], columns)
    else:
        infoDF = infoDF.rdd.filter(lambda x: x.vid != vid).toDF()
        sql_results = sql_results.withColumn('viewed', F.regexp_replace('viewed', 'false', 'true'))
        sql_results = sql_results.withColumn('viewed', F.col('viewed').cast('boolean'))
    result = infoDF.union(sql_results)
    result.show()
    saveProfileInfo(result, profile_path)

In [233]:
viewVideo(0, 'test', 4)

+---+-----+------+-----+
|vid|liked|viewed| list|
+---+-----+------+-----+
|  1| true|  true| true|
|  2|false|  true|false|
|  3|false|  true| true|
|  0| true|  true|false|
|  4|false|  true| true|
|  5| true| false|false|
+---+-----+------+-----+

+---+-----+------+-----+
|vid|liked|viewed| list|
+---+-----+------+-----+
|  1| true|  true| true|
|  2|false|  true|false|
|  3|false|  true| true|
|  0| true|  true|false|
|  5| true| false|false|
|  4|false|  true| true|
+---+-----+------+-----+



In [16]:
def addToList(uid, profile, vid):
    profile_path = f'hdfs:///home/users/{uid}/profiles/{profile}'
    # Get Dataframe
    infoDF = spark.read.csv(f'{profile_path}/info.csv', header=True, inferSchema=True)
    infoDF.show()
    infoDF.createOrReplaceTempView('info')
    # Get vid row
    sql_results = spark.sql(f'SELECT * FROM info WHERE vid = {vid}')
    if len(sql_results.take(1)) == 0: # No existing entry for the vid
        columns = ['vid', 'liked', 'viewed', 'list']
        sql_results = spark.createDataFrame([(vid, False, False, True)], columns)
    else:
        infoDF = infoDF.rdd.filter(lambda x: x.vid != vid).toDF()
        sql_results = sql_results.withColumn('list', F.regexp_replace('list', 'false', 'true'))
        sql_results = sql_results.withColumn('list', F.col('viewed').cast('boolean'))
    result = infoDF.union(sql_results)
    result.show()
    saveProfileInfo(result, profile_path)

In [None]:
@app.route('/')

In [17]:
class HelloWorld(Resource):
    def get(self, uid, profile):
        return getLikedVideos(uid, profile)
api.add_resource(HelloWorld, '/hello/<int:uid>/<string:profile>')

In [None]:
if __name__ == '__main__':
    app.run(debug=False)

 * Serving Flask app "__main__" (lazy loading)
 * Environment: production
[2m   Use a production WSGI server instead.[0m
 * Debug mode: off


 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)


+---+-----+------+-----+
|vid|liked|viewed| list|
+---+-----+------+-----+
|  1| true|  true| true|
|  2|false|  true|false|
|  3|false|  true| true|
|  0| true|  true|false|
|  4|false|  true| true|
|  5| true| false|false|
+---+-----+------+-----+

+---+-----+------+-----+
|vid|liked|viewed| list|
+---+-----+------+-----+
|  1| true|  true| true|
|  0| true|  true|false|
|  5| true| false|false|
+---+-----+------+-----+



127.0.0.1 - - [20/Nov/2020 06:19:32] "[37mGET /hello/0/test HTTP/1.1[0m" 200 -
