In [640]:
import os
import sys
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'
os.environ["PYSPARK_SUBMIT_ARGS"]='--num-executors 3 pyspark-shell'

spark_home = os.environ.get('SPARK_HOME', None)

sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))

In [641]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
import json
import re

conf = SparkConf()

spark = (SparkSession
        .builder
        .config(conf=conf)
        .appName('test')
        .getOrCreate())

In [642]:
sc = spark.sparkContext

In [643]:
!hdfs dfs -ls -h /labs/slaba02

Found 1 items
-rw-r--r--   3 hdfs hdfs     66.3 M 2022-01-06 18:46 /labs/slaba02/DO_record_per_line.json


In [644]:
rdd = sc.textFile('/labs/slaba02/DO_record_per_line.json')

In [645]:
rdd.getNumPartitions()

2

In [646]:
rdd = rdd.repartition(6)

In [647]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, RegexTokenizer

In [648]:
schema = StructType(fields=[
    StructField('course_id', IntegerType()),
    StructField('lang', StringType()),
    StructField('name', StringType()),
    StructField('words', ArrayType(StringType()))
])

In [649]:
df = spark.read.json('/labs/slaba02/DO_record_per_line.json', multiLine=False)

In [650]:
tokenizer = RegexTokenizer(inputCol='desc', outputCol='words')
wordsData = tokenizer.transform(df)
wordsData.show(5)

+--------------------+--------------------+---+----+--------------------+--------------+--------------------+
|                 cat|                desc| id|lang|                name|      provider|               words|
+--------------------+--------------------+---+----+--------------------+--------------+--------------------+
|3/business_manage...|This course intro...|  4|  en|Accounting Cycle:...|Canvas Network|[this, course, in...|
|              11/law|This online cours...|  5|  en|American Counter ...|Canvas Network|[this, online, co...|
|5/computer_scienc...|This course is ta...|  6|  fr|Arithmétique: en ...|Canvas Network|[this, course, is...|
|  14/social_sciences|We live in a digi...|  7|  en|Becoming a Dynami...|Canvas Network|[we, live, in, a,...|
|2/biology_life_sc...|This self-paced c...|  8|  en|           Bioethics|Canvas Network|[this, self-paced...|
+--------------------+--------------------+---+----+--------------------+--------------+--------------------+
only showi

In [651]:
hashingTF = HashingTF(inputCol='words', outputCol='rawFeatures', numFeatures=10000)
featurizedData = hashingTF.transform(wordsData)

In [652]:
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

In [653]:
df2 = rescaledData.select('id', 'lang', 'name', 'features')

In [654]:
df2.filter("lang == 'en'").show()

+---+----+--------------------+--------------------+
| id|lang|                name|            features|
+---+----+--------------------+--------------------+
|  4|  en|Accounting Cycle:...|(10000,[36,42,63,...|
|  5|  en|American Counter ...|(10000,[32,222,29...|
|  7|  en|Becoming a Dynami...|(10000,[493,572,7...|
|  8|  en|           Bioethics|(10000,[32,65,115...|
|  9|  en|College Foundatio...|(10000,[56,91,268...|
| 10|  en|Digital Literacies I|(10000,[1045,1263...|
| 11|  en|Digital Literacie...|(10000,[87,157,57...|
| 12|  en|Digital Tools for...|(10000,[161,164,4...|
| 13|  en|Discover Your Val...|(10000,[26,1072,1...|
| 14|  en|Enhancing Patient...|(10000,[63,145,23...|
| 15|  en|Ethics and Values...|(10000,[32,65,77,...|
| 16|  en| Exploring Chemistry|(10000,[32,273,30...|
| 17|  en|Exploring Enginee...|(10000,[695,1420,...|
| 18|  en|Fairy Tales: Orig...|(10000,[307,316,3...|
| 19|  en|First Peoples to ...|(10000,[572,768,8...|
| 20|  en| Forums for a Future|(10000,[91,273,

In [655]:
df2.createOrReplaceTempView("df2")

In [656]:
query = '''
SELECT id, lang, name, rawFeatures
from david
where lang = 'en'
'''

In [657]:
df2.dtypes

[('id', 'bigint'),
 ('lang', 'string'),
 ('name', 'string'),
 ('features', 'vector')]

In [658]:
from pyspark.ml.linalg import Vectors

In [659]:
import pyspark.sql.functions as f


In [660]:
import math
def cos_dist(x, y):
    return x.dot(y) / (x.norm(2) * y.norm(2))

In [661]:
courses = df2.filter(df.id.isin([21617, 23126, 16627, 11556, 16704, 13702]))

In [662]:
courses.show()

+-----+----+--------------------+--------------------+
|   id|lang|                name|            features|
+-----+----+--------------------+--------------------+
|11556|  es|Aprendizaje Colab...|(10000,[249,522,5...|
|13702|  ru|Математическая эк...|(10000,[310,763,9...|
|16627|  es|Aprende Excel: Ni...|(10000,[30,145,19...|
|16704|  ru|Программирование ...|(10000,[381,1144,...|
|21617|  en|Preparing for the...|(10000,[213,360,4...|
|23126|  en|Compass - powerfu...|(10000,[87,91,96,...|
+-----+----+--------------------+--------------------+



In [663]:
courses.createOrReplaceTempView("courses")

In [664]:
cond = [(courses.lang == df2.lang) & (courses.id != df2.id)]

In [665]:
f = courses.join(df2, cond, how='inner').select(courses.id.alias('c_id'), df2.id.alias('d_id'), courses.features.alias('v1'), df2.features.alias('v2'))

In [666]:
query = '''
SELECT c.id as c_id
       ,d.id as d_id 
       ,c.features as v1
       ,d.features as v2
       ,d.name
from courses c
    join df2 d on
        c.lang = d.lang
        and c.id != d.id
'''

In [667]:
f = (spark.sql(query).rdd.coalesce(12)\
     .map(lambda x: (int(x[0]), float(cos_dist(x[2], x[3])), int(x[1]), x[4]))\
    )

In [668]:
schema = StructType(fields=[
    StructField('task_id', IntegerType()),
    StructField('cos_dist', FloatType()),
    StructField('rec_id', IntegerType()),
    StructField('rec_name', StringType())
])

In [669]:
df = spark.createDataFrame(f, schema = schema)

In [670]:
df = df.sort(df.task_id.asc(), df.cos_dist.desc(), df.rec_name.asc(), df.rec_id.asc()).cache()

In [671]:
import pyspark.sql.functions as f

In [673]:
df = df.na.drop(subset=['cos_dist'])

In [674]:
keys = [str(row[0]) for row in courses.select('id').take(6)]

In [675]:
answers = []
for i in keys:
    answers.append([row[0] for row in df.filter(df.task_id == i).select('rec_id').take(10)])

In [676]:
import json

data = {keys[i]: answers[i] for i in range(len(keys))}

with open(r"lab02.json", 'w') as f:
    json.dump(data, f)

In [None]:
#!hdfs dfs -put lab02.json /user/david.badma-khalgaev

In [639]:
!hdfs dfs -ls /user/david.badma-khalgaev

Found 4 items
drwx------   - david.badma-khalgaev david.badma-khalgaev          0 2022-04-25 03:00 /user/david.badma-khalgaev/.Trash
drwxr-xr-x   - david.badma-khalgaev david.badma-khalgaev          0 2022-04-27 07:58 /user/david.badma-khalgaev/.sparkStaging
-rw-r--r--   3 david.badma-khalgaev david.badma-khalgaev        179 2022-04-14 14:10 /user/david.badma-khalgaev/lab01.json
-rw-r--r--   3 david.badma-khalgaev david.badma-khalgaev        457 2022-04-24 19:37 /user/david.badma-khalgaev/lab02.json


In [677]:
sc.stop()