In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [0]:
!wget -q www-us.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz

In [0]:
!tar -xvf spark-2.4.3-bin-hadoop2.7.tgz

In [0]:
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pandas.plotting import scatter_matrix
import seaborn as sns
import numpy as np
from sklearn.linear_model import LinearRegression
from pyspark.sql.functions import array, col, explode, struct, lit

import warnings
warnings.simplefilter('ignore')

import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

spark = SparkSession.builder.master("local[*]").getOrCreate()

## 7. Загрузить данные в Spark

In [0]:
df_data = spark.read.csv("u.data", sep='\t', header=None, inferSchema=True)
df_genre = spark.read.csv("u.genre", sep='|', header=None, inferSchema=True)
df_info = spark.read.csv("u.info", sep=' ', header=None, inferSchema=True)
df_occupation = spark.read.csv("u.occupation", sep=' ', header=None, inferSchema=True)
df_user = spark.read.csv("u.user", sep='|', header=None, inferSchema=True)
df_item = spark.read.csv("u.item", sep='|', header=None, inferSchema=True)
# encoding='latin_1'

In [0]:
# new_names_df_data = ['user_id', 'movie_id', 'rating', 'timestamp']
# df_data = df_data.toDF(*new_names_df_data)

# new_names_df_genre = ['genres', 'genres_id']
# df_genre = df_genre.toDF(*new_names_df_genre)

# new_names_df_user = ['user_id', 'age', 'gender', 'occupation', 'zip_code']
# df_user = df_user.toDF(*new_names_df_user)

# new_names_df_item = ['movie_id', 'movie_title', 'release_date', 
#                      'video_release_date', 'IMDb_URL', 'unknown', 
#                      'Action', 'Adventure', 'Animation', "Children's", 
#                      'Comedy', 'Crime', 'Documentary', 'Drama', 'Fantasy', 
#                      'Film-Noir', 'Horror', 'Musical', 'Mystery', 'Romance', 
#                      'Sci-Fi', 'Thriller', 'War', 'Western']
# df_item = df_item.toDF(*new_names_df_item)

In [0]:
df_data.show()

+---+----+---+---------+
|_c0| _c1|_c2|      _c3|
+---+----+---+---------+
|196| 242|  3|881250949|
|186| 302|  3|891717742|
| 22| 377|  1|878887116|
|244|  51|  2|880606923|
|166| 346|  1|886397596|
|298| 474|  4|884182806|
|115| 265|  2|881171488|
|253| 465|  5|891628467|
|305| 451|  3|886324817|
|  6|  86|  3|883603013|
| 62| 257|  2|879372434|
|286|1014|  5|879781125|
|200| 222|  5|876042340|
|210|  40|  3|891035994|
|224|  29|  3|888104457|
|303| 785|  3|879485318|
|122| 387|  5|879270459|
|194| 274|  2|879539794|
|291|1042|  4|874834944|
|234|1184|  2|892079237|
+---+----+---+---------+
only showing top 20 rows



In [0]:
df_data.dtypes

[('_c0', 'int'), ('_c1', 'int'), ('_c2', 'int'), ('_c3', 'int')]

In [0]:
df_data.describe().show()

+-------+------------------+------------------+------------------+-----------------+
|summary|               _c0|               _c1|               _c2|              _c3|
+-------+------------------+------------------+------------------+-----------------+
|  count|            100000|            100000|            100000|           100000|
|   mean|         462.48475|         425.53013|           3.52986|8.8352885148862E8|
| stddev|266.61442012750905|330.79835632558473|1.1256735991443214|5343856.189502848|
|    min|                 1|                 1|                 1|        874724710|
|    max|               943|              1682|                 5|        893286638|
+-------+------------------+------------------+------------------+-----------------+



## 8. Средствами спарка вывести среднюю оценку для каждого фильма

In [0]:
df_data.columns

df_data = df_data.withColumnRenamed('_c0', 'user id')\
    .withColumnRenamed('_c1', 'item id')\
    .withColumnRenamed('_c2', 'rating')\
    .withColumnRenamed('_c3', 'timestamp');

df_data_grp = df_data.groupby('item id')
df_data_grp_mean = df_data_grp.mean('rating')

df_data_grp_mean.show()

+-------+------------------+
|item id|       avg(rating)|
+-------+------------------+
|    496| 4.121212121212121|
|    471|3.6108597285067874|
|    463| 3.859154929577465|
|    148|          3.203125|
|   1342|               2.5|
|    833| 3.204081632653061|
|   1088| 2.230769230769231|
|   1591|3.1666666666666665|
|   1238|             3.125|
|   1580|               1.0|
|   1645|               4.0|
|    392|3.5441176470588234|
|    623| 2.923076923076923|
|    540| 2.511627906976744|
|    858|               1.0|
|    737| 2.983050847457627|
|    243|2.4393939393939394|
|   1025|2.9318181818181817|
|   1084| 3.857142857142857|
|   1127| 2.909090909090909|
+-------+------------------+
only showing top 20 rows



## 9. В Spark получить 2 df с 5-ю самыми популярными и самыми непопулярными фильмами (по количеству оценок, либо по самой оценке)

In [0]:
df_items = df_item['_c0', '_c1']

df_items = df_items.withColumnRenamed('_c0','item id')\
    .withColumnRenamed('_c1','movie title')

df_data_grp_mp = spark.createDataFrame(df_data_grp.count().orderBy('count', ascending=False).take(5)) 
df_data_grp_lp = spark.createDataFrame(df_data_grp.count().orderBy('count', ascending=True).take(5)) 

df_data_grp_mp.join(df_items, 'item id', how='inner').show()
df_data_grp_lp.join(df_items, 'item id', how='inner').show()

+-------+-----+--------------------+
|item id|count|         movie title|
+-------+-----+--------------------+
|     50|  583|    Star Wars (1977)|
|    258|  509|      Contact (1997)|
|    100|  508|        Fargo (1996)|
|    181|  507|Return of the Jed...|
|    294|  485|    Liar Liar (1997)|
+-------+-----+--------------------+

+-------+-----+--------------------+
|item id|count|         movie title|
+-------+-----+--------------------+
|   1460|    1|    Sleepover (1995)|
|   1507|    1|Three Lives and O...|
|   1580|    1|     Liebelei (1933)|
|   1645|    1|Butcher Boy, The ...|
|   1618|    1|King of New York ...|
+-------+-----+--------------------+



## 10. Средствами Spark соедините информацию по фильмам и жанрам (u.genre)

In [0]:
df_item.show()
df_genre.show()

+---+--------------------+-----------+----+--------------------+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|_c0|                 _c1|        _c2| _c3|                 _c4|_c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|
+---+--------------------+-----------+----+--------------------+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|  1|    Toy Story (1995)|01-Jan-1995|null|http://us.imdb.co...|  0|  0|  0|  1|  1|   1|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|
|  2|    GoldenEye (1995)|01-Jan-1995|null|http://us.imdb.co...|  0|  1|  1|  0|  0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   1|   0|   0|
|  3|   Four Rooms (1995)|01-Jan-1995|null|http://us.imdb.co...|  0|  0|  0|  0|  0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   1|   0|   0|
|  4|   Get Shorty (1995)|01-Jan-1995|null|http://us.imdb.co...|  0|  

In [0]:
df_item = df_item.withColumnRenamed('_c0','item id')\
            .withColumnRenamed('_c1','movie title')\
             .withColumnRenamed('_c5','unknown')\
        .withColumnRenamed('_c6','Action')\
        .withColumnRenamed('_c7','Adventure')\
        .withColumnRenamed('_c8','Animation')\
        .withColumnRenamed('_c9','Children\'s')\
        .withColumnRenamed('_c10','Comedy')\
        .withColumnRenamed('_c11','Crime')\
        .withColumnRenamed('_c12','Documentary')\
        .withColumnRenamed('_c13','Drama')\
        .withColumnRenamed('_c14','Fantasy')\
        .withColumnRenamed('_c15','Film-Noir')\
        .withColumnRenamed('_c16','Horror')\
        .withColumnRenamed('_c17','Musical')\
        .withColumnRenamed('_c18','Mystery')\
        .withColumnRenamed('_c19','Romance')\
        .withColumnRenamed('_c20','Sci-Fi')\
        .withColumnRenamed('_c21','Thriller')\
        .withColumnRenamed('_c22','War')\
        .withColumnRenamed('_c23','Western')\
    
df_item = df_item['item id', 'unknown', 'Action', 'Adventure', 'Animation',
              'Children\'s', 'Comedy', 'Crime', 'Documentary', 'Drama', 'Fantasy',
              'Film-Noir', 'Horror', 'Musical','Mystery','Romance', 'Sci-Fi',
              'Thriller', 'War','Western']

In [0]:
def to_long(df, by):

    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    kvs = explode(array([
      struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])

df_g_trans = to_long(df_item, ["item id"])
df_g_trans = df_g_trans.where(df_g_trans['val'] > 0)

df_res = df_g_trans.join(df_items, 'item id', how='inner')['item id', 'movie title', 'key']\
    .withColumnRenamed('key','genre')
df_res = df_res.join(df_data_grp_mean, 'item id', how='inner') 
df_res.show()

+-------+--------------------+----------+------------------+
|item id|         movie title|     genre|       avg(rating)|
+-------+--------------------+----------+------------------+
|      1|    Toy Story (1995)| Animation|3.8783185840707963|
|      1|    Toy Story (1995)|Children's|3.8783185840707963|
|      1|    Toy Story (1995)|    Comedy|3.8783185840707963|
|      2|    GoldenEye (1995)|    Action|3.2061068702290076|
|      2|    GoldenEye (1995)| Adventure|3.2061068702290076|
|      2|    GoldenEye (1995)|  Thriller|3.2061068702290076|
|      3|   Four Rooms (1995)|  Thriller| 3.033333333333333|
|      4|   Get Shorty (1995)|    Action| 3.550239234449761|
|      4|   Get Shorty (1995)|    Comedy| 3.550239234449761|
|      4|   Get Shorty (1995)|     Drama| 3.550239234449761|
|      5|      Copycat (1995)|     Crime| 3.302325581395349|
|      5|      Copycat (1995)|     Drama| 3.302325581395349|
|      5|      Copycat (1995)|  Thriller| 3.302325581395349|
|      6|Shanghai Triad 

## 11. Посчитайте средствами Spark среднюю оценку для каждого жанра

In [0]:
df_data_grp_g = df_res.groupby('genre')
df_data_grp_mean_g = df_data_grp_g.mean('avg(rating)')
df_data_grp_mean_g.show()

+-----------+------------------+
|      genre|  avg(avg(rating))|
+-----------+------------------+
|      Crime|3.2110147495997547|
|    Romance|3.2440490301647995|
|   Thriller| 3.136692475563836|
|  Adventure|3.1439673136976105|
|    unknown|2.2222222222222223|
| Children's| 2.916884891868897|
|      Drama|3.1873534142973314|
|        War| 3.489185428943569|
|Documentary| 3.229273094093941|
|    Fantasy| 2.849830297920943|
|    Mystery| 3.336813932383023|
|    Musical| 3.376423123539873|
|  Animation|3.2988130803823776|
|  Film-Noir|3.5483508757184237|
|     Horror| 2.730157352996138|
|    Western| 3.185617473056144|
|     Comedy|3.0005649618406727|
|     Action| 2.966332403758986|
|     Sci-Fi|3.1654460532512894|
+-----------+------------------+



In [0]:
pass