In [1]:
import pandas as pd
from os import listdir

In [6]:
localdir = '/home/lexzhe/BigDataMl/hw2'

In [2]:

datafiles = listdir(localdir + '/Archive')
books = []
ratings = []
for filename in datafiles:
    df = pd.read_csv(localdir + '/Archive/' + filename)
    df.columns = df.columns.str.lower()
    if filename.startswith('book'):
        books.append(df)
    else:
        ratings.append(df)
booksdf = pd.concat(books)
ratings = pd.concat(ratings)

In [15]:
booksdf.to_parquet('books1.parquet')
booksdf.to_csv('books1.csv')
ratings.to_parquet('ratings.parquet')

In [4]:
import subprocess


def run_linux_cmd(args_list):
        print('Running system command: {0}'.format(' '.join(args_list)))
        proc = subprocess.Popen(args_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        s_output, s_err = proc.communicate()
        s_return =  proc.returncode
        return s_return, s_output, s_err

In [16]:
(ret, out, err)= run_linux_cmd(['hdfs', 'dfs', '-put', 'books1.parquet', '/data/'])
print(ret)

Running system command: hdfs dfs -put books1.parquet /data/
0


In [17]:
(ret, out, err)= run_linux_cmd(['hdfs', 'dfs', '-put', 'books1.csv', '/data/'])
print(ret)

Running system command: hdfs dfs -put books1.csv /data/
0


In [9]:
(ret, out, err)= run_linux_cmd(['hdfs', 'dfs', '-put', 'ratings.parquet', '/data/'])
print(ret)

Running system command: hdfs dfs -put ratings.parquet /data/
0


In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('spark://MSI.localdomain:7077').getOrCreate()

In [2]:
spark.catalog.clearCache()

In [3]:
%%time
csvBooks=spark.read.csv('hdfs://localhost:9000/data/books1.csv')


CPU times: user 2.35 ms, sys: 2.23 ms, total: 4.58 ms
Wall time: 7.54 s


In [4]:
%%time
books=spark.read.parquet("hdfs://localhost:9000/data/books1.parquet")

CPU times: user 3.74 ms, sys: 0 ns, total: 3.74 ms
Wall time: 1.05 s


In [7]:
import os

parquet_stats = os.stat(localdir + '/books1.parquet')
csv_stats = os.stat(localdir + '/books1.csv')
print(f'Parquet File Size in Bytes is  {parquet_stats.st_size}')
print(f'Csv File Size in Bytes is     {csv_stats.st_size}')

Parquet File Size in Bytes is  717234434
Csv File Size in Bytes is     1178082266


Как видно выше, csv файл загружается гораздо медленнее и занимает больше объема.

In [8]:
%%time
ratings=spark.read.parquet("hdfs://localhost:9000/data/ratings.parquet")
ratings.show()

+----+------+--------------------+-----------------+
|  id|  name|              rating|__index_level_0__|
+----+------+--------------------+-----------------+
|2098|Rating|This user doesn't...|                0|
|2099|Rating|This user doesn't...|                1|
|2103|Rating|This user doesn't...|                2|
|2105|Rating|This user doesn't...|                3|
|2106|Rating|This user doesn't...|                4|
|2108|Rating|This user doesn't...|                5|
|2109|Rating|This user doesn't...|                6|
|2110|Rating|This user doesn't...|                7|
|2113|Rating|This user doesn't...|                8|
|2116|Rating|This user doesn't...|                9|
|2117|Rating|This user doesn't...|               10|
|2118|Rating|This user doesn't...|               11|
|2119|Rating|This user doesn't...|               12|
|2121|Rating|This user doesn't...|               13|
|2122|Rating|This user doesn't...|               14|
|2123|Rating|This user doesn't...|            

In [9]:
books.show()

+---+--------------------+-----------+-----------+-----------+---------------+------------+----------+--------------------+--------------+-----------+--------+--------------------+------------------+-----------+-----------+----------+-----------+-----------+---------------------+-----------------+
| id|                name|ratingdist1|pagesnumber|ratingdist4|ratingdisttotal|publishmonth|publishday|           publisher|countsofreview|publishyear|language|             authors|            rating|ratingdist2|ratingdist5|      isbn|ratingdist3|description|count of text reviews|__index_level_0__|
+---+--------------------+-----------+-----------+-----------+---------------+------------+----------+--------------------+--------------+-----------+--------+--------------------+------------------+-----------+-----------+----------+-----------+-----------+---------------------+-----------------+
|  1|Harry Potter and ...|     1:9896|        652|   4:556485|  total:2298124|          16|         9| 

In [15]:
from pyspark.sql import functions as F

In [18]:
type(ratings)

pyspark.sql.dataframe.DataFrame

In [25]:
types.rating.to_numpy()

array(['did not like it', 'really liked it', 'liked it', 'it was ok',
       'it was amazing', "This user doesn't have any rating"],
      dtype=object)

In [29]:
relevantratings = ratings.filter(ratings.rating !="This user doesn't have any rating")
relevantratings.show()

+----+---------------+---------------+-----------------+
|  id|           name|         rating|__index_level_0__|
+----+---------------+---------------+-----------------+
|2101|Of Mice and Men|       liked it|              580|
|2154|Of Mice and Men|       liked it|              581|
|2158|Of Mice and Men|       liked it|              582|
|2186|Of Mice and Men|really liked it|              583|
|2195|Of Mice and Men|       liked it|              584|
|2229|Of Mice and Men| it was amazing|              585|
|2259|Of Mice and Men|really liked it|              586|
|2282|Of Mice and Men| it was amazing|              587|
|2383|Of Mice and Men|really liked it|              588|
|2429|Of Mice and Men|       liked it|              589|
|2452|Of Mice and Men|       liked it|              590|
|2613|Of Mice and Men|really liked it|              591|
|2635|Of Mice and Men|really liked it|              592|
|2636|Of Mice and Men|       liked it|              593|
|2644|Of Mice and Men| it was a

In [38]:
types = relevantratings.select("rating").distinct().toPandas().rating.to_numpy().tolist()
numericalrating = [0, 4, 3, 2, 5]
mapper = dict(zip(types, numericalrating))

In [41]:
from pyspark.sql.types import IntegerType

def translate(mapping):
    def translate_(col):
        return mapping.get(col)
    return F.udf(translate_, IntegerType())

numratings = relevantratings.withColumn('num_rating', translate(mapper)("rating")).select('name', 'num_rating', 'id')
numratings.show()

+---------------+----------+----+
|           name|num_rating|  id|
+---------------+----------+----+
|Of Mice and Men|         3|2101|
|Of Mice and Men|         3|2154|
|Of Mice and Men|         3|2158|
|Of Mice and Men|         4|2186|
|Of Mice and Men|         3|2195|
|Of Mice and Men|         5|2229|
|Of Mice and Men|         4|2259|
|Of Mice and Men|         5|2282|
|Of Mice and Men|         4|2383|
|Of Mice and Men|         3|2429|
|Of Mice and Men|         3|2452|
|Of Mice and Men|         4|2613|
|Of Mice and Men|         4|2635|
|Of Mice and Men|         3|2636|
|Of Mice and Men|         5|2644|
|Of Mice and Men|         2|2670|
|Of Mice and Men|         5|2682|
|Of Mice and Men|         3|2685|
|Of Mice and Men|         4|2696|
|Of Mice and Men|         2|2818|
+---------------+----------+----+
only showing top 20 rows



In [42]:
books.show()

+---+--------------------+-----------+-----------+-----------+---------------+------------+----------+--------------------+--------------+-----------+--------+--------------------+------------------+-----------+-----------+----------+-----------+-----------+---------------------+-----------------+
| id|                name|ratingdist1|pagesnumber|ratingdist4|ratingdisttotal|publishmonth|publishday|           publisher|countsofreview|publishyear|language|             authors|            rating|ratingdist2|ratingdist5|      isbn|ratingdist3|description|count of text reviews|__index_level_0__|
+---+--------------------+-----------+-----------+-----------+---------------+------------+----------+--------------------+--------------+-----------+--------+--------------------+------------------+-----------+-----------+----------+-----------+-----------+---------------------+-----------------+
|  1|Harry Potter and ...|     1:9896|        652|   4:556485|  total:2298124|          16|         9| 

In [45]:
joined = numratings.join(books, numratings.name == books.name, 'left')
joined.show()

+---------------+----------+----+-------+---------------+-----------+-----------+-----------+---------------+------------+----------+--------------------+--------------+-----------+--------+--------------+------+-----------+-----------+----------+-----------+--------------------+---------------------+-----------------+
|           name|num_rating|  id|     id|           name|ratingdist1|pagesnumber|ratingdist4|ratingdisttotal|publishmonth|publishday|           publisher|countsofreview|publishyear|language|       authors|rating|ratingdist2|ratingdist5|      isbn|ratingdist3|         description|count of text reviews|__index_level_0__|
+---------------+----------+----+-------+---------------+-----------+-----------+-----------+---------------+------------+----------+--------------------+--------------+-----------+--------+--------------+------+-----------+-----------+----------+-----------+--------------------+---------------------+-----------------+
|Of Mice and Men|         3|2101|    

In [None]:
books.filter(books.name == 'Of Mice and Men').show()

In [None]:
ratings.filter(ratings.name == 'Of Mice and Men').show()

Топ-10 книг с наибольшим числом ревью:

In [50]:
numratings.groupBy('name').count().sort(F.desc('count')).show(10)

+--------------------+-----+
|                name|count|
+--------------------+-----+
|The Catcher in th...|  985|
|    The Great Gatsby|  885|
|The Da Vinci Code...|  846|
|To Kill a Mocking...|  830|
|                1984|  756|
|     The Kite Runner|  749|
|Harry Potter and ...|  728|
|         Animal Farm|  717|
|Harry Potter and ...|  639|
|Harry Potter and ...|  631|
+--------------------+-----+
only showing top 10 rows



Топ-10 издателей с наибольшим средним числом страниц в книгах:

In [55]:
books.groupBy('publisher').mean('pagesnumber').sort(F.desc('avg(pagesnumber)')).show(10)

+--------------------+------------------+
|           publisher|  avg(pagesnumber)|
+--------------------+------------------+
|Crafty Secrets Pu...|         1807321.6|
|    Sacred-texts.com|          500000.0|
|Department of Rus...| 322128.5714285714|
|Logos Research Sy...|          100000.0|
|Encyclopedia Brit...|           32642.0|
|Progressive Manag...|        19106.3625|
|Still Waters Revi...|10080.142857142857|
|P. Shalom Publica...|            8539.0|
|Hendrickson Publi...|            6448.0|
|            IEEE/EMB|            6000.0|
+--------------------+------------------+
only showing top 10 rows



Десять наиболее активных по числу изданных книг лет

In [57]:
books.groupBy('publishyear').count().sort(F.desc('count')).show(10)

+-----------+------+
|publishyear| count|
+-----------+------+
|       2007|129507|
|       2006|122374|
|       2005|117639|
|       2004|105733|
|       2003|104345|
|       2002| 95537|
|       2001| 88228|
|       2000| 87290|
|       2008| 80265|
|       1999| 80155|
+-----------+------+
only showing top 10 rows



Топ-10 книг имеющих наибольший разброс в оценках среди книг имеющих больше 500
оценок

In [71]:
aggregated = numratings.groupBy('name').agg(F.count('name').alias('counted'), F.min('num_rating').alias('min_rating'), F.max('num_rating').alias('max_rating'))

aggregated.filter(aggregated.counted > 500).withColumn("deviance", aggregated.max_rating - aggregated.min_rating).sort(F.desc('deviance')).show(10)


+--------------------+-------+----------+----------+--------+
|                name|counted|min_rating|max_rating|deviance|
+--------------------+-------+----------+----------+--------+
|To Kill a Mocking...|    830|         0|         5|       5|
|Me Talk Pretty On...|    535|         0|         5|       5|
|The Da Vinci Code...|    846|         0|         5|       5|
| Memoirs of a Geisha|    574|         0|         5|       5|
|           Middlesex|    529|         0|         5|       5|
|                1984|    756|         0|         5|       5|
|    The Great Gatsby|    885|         0|         5|       5|
|Harry Potter and ...|    728|         0|         5|       5|
|Harry Potter and ...|    593|         0|         5|       5|
|Harry Potter and ...|    503|         0|         5|       5|
+--------------------+-------+----------+----------+--------+
only showing top 10 rows



In [82]:
years = (books.select("publishyear").distinct().toPandas().publishyear.to_numpy().tolist())
years.sort()
print (years)

[1, 8, 162, 199, 200, 202, 208, 299, 1192, 1376, 1384, 1385, 1387, 1623, 1730, 1753, 1824, 1825, 1833, 1835, 1836, 1837, 1838, 1839, 1841, 1846, 1847, 1850, 1851, 1852, 1854, 1856, 1859, 1862, 1863, 1864, 1865, 1866, 1868, 1869, 1871, 1874, 1878, 1879, 1880, 1881, 1884, 1885, 1886, 1888, 1889, 1892, 1893, 1894, 1895, 1896, 1897, 1898, 1899, 1900, 1901, 1902, 1903, 1904, 1905, 1906, 1907, 1908, 1909, 1910, 1911, 1912, 1913, 1914, 1915, 1916, 1917, 1918, 1919, 1920, 1921, 1922, 1923, 1924, 1925, 1926, 1927, 1928, 1929, 1930, 1931, 1932, 1933, 1934, 1935, 1936, 1937, 1938, 1939, 1940, 1941, 1942, 1943, 1944, 1945, 1946, 1947, 1948, 1949, 1950, 1951, 1952, 1953, 1954, 1955, 1956, 1957, 1958, 1959, 1960, 1961, 1962, 1963, 1964, 1965, 1966, 1967, 1968, 1969, 1970, 1971, 1972, 1973, 1974, 1975, 1976, 1977, 1978, 1979, 1980, 1981, 1982, 1983, 1984, 1985, 1986, 1987, 1988, 1989, 1990, 1991, 1992, 1993, 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 200

1)В колонке года есть явные артефакты
2)Издатели выпустившие наибольшее число книг за последние 10 лет:

In [83]:
books.filter((books.publishyear > 2013) & (books.publishyear < 2023)).groupBy('publisher').count().sort(F.desc('count')).show(10)

+--------------------+-----+
|           publisher|count|
+--------------------+-----+
|           Routledge|  425|
|          de Gruyter|  350|
|             Linkgua|  280|
|       Samuel French|  248|
|   Walter de Gruyter|  158|
|Oxford University...|  156|
|             Pearson|  144|
|   Samuel French Ltd|  138|
|  Dover Publications|  130|
|           Passbooks|  124|
+--------------------+-----+
only showing top 10 rows

