#Лабораторная 2. Формирование отчётов в Apache Spark

##Задание
Сформировать отчёт с информацией о 10 наиболее популярных языках программирования по итогам года за период с 2010 по 2020 годы. Отчёт будет отражать динамику изменения популярности языков программирования и представлять собой набор таблиц "топ-10" для каждого года.

Получившийся отчёт сохранить в формате Apache Parquet.

Для выполнения задания вы можете использовать любую комбинацию Spark API: RDD API, Dataset API, SQL API.

##Набор данных
Архивы сайтов Stack Exchange доступны по адресу https://archive.org/details/stackexchange.

В папке data данного репозитория вам доступны:

выборка данных posts_sample.xml (из stackoverflow.com-Posts.7z),
файл со списком языков programming-languages.csv, собранных с вики-страницы https://en.wikipedia.org/wiki/List_of_programming_languages.
Рекомендуется отлаживать решение на небольшой выборке данных posts_sample.xml.

In [2]:
!pip3 install pyspark==3.0.0

Collecting pyspark==3.0.0
  Downloading pyspark-3.0.0.tar.gz (204.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m204.7/204.7 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9 (from pyspark==3.0.0)
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m16.3 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.0-py2.py3-none-any.whl size=205044159 sha256=d64ccbe68ea0133e7407637846aacca9d15e254fbe0e83b371680358d2b64e2a
  Stored in directory: /root/.cache/pip/wheels/b1/bb/8b/ca24d3f756f2ed967225b0871898869db676eb5846df5adc56
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0

In [3]:
import pyspark
import os

from datetime import datetime
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import StructType, StructField, StringType, DateType

In [4]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.13.0 pyspark-shell'
sc = SparkSession.builder.appName("L2").master("local[*]").getOrCreate()
sc

In [5]:
pro_lang = sc.read.csv("programming-languages.csv")
# Создадим список из имен языков программирования
pro_lang_list = [str(x[0]) for x in pro_lang.collect()]
pro_lang_list[:10]

['name',
 'A# .NET',
 'A# (Axiom)',
 'A-0 System',
 'A+',
 'A++',
 'ABAP',
 'ABC',
 'ABC ALGOL',
 'ABSET']

In [7]:
posts_sample = sc.read.format("xml").options(rowTag="row").load('posts_sample.xml')
posts_sample.take(1)

[Row(_AcceptedAnswerId=7, _AnswerCount=13, _Body="<p>I want to use a track-bar to change a form's opacity.</p>\n\n<p>This is my code:</p>\n\n<pre><code>decimal trans = trackBar1.Value / 5000;\nthis.Opacity = trans;\n</code></pre>\n\n<p>When I build the application, it gives the following error:</p>\n\n<blockquote>\n  <p>Cannot implicitly convert type <code>'decimal'</code> to <code>'double'</code></p>\n</blockquote>\n\n<p>I tried using <code>trans</code> and <code>double</code> but then the control doesn't work. This code worked fine in a past VB.NET project.</p>\n", _ClosedDate=None, _CommentCount=2, _CommunityOwnedDate=datetime.datetime(2012, 10, 31, 16, 42, 47, 213000), _CreationDate=datetime.datetime(2008, 7, 31, 21, 42, 52, 667000), _FavoriteCount=48, _Id=4, _LastActivityDate=datetime.datetime(2019, 7, 19, 1, 39, 54, 173000), _LastEditDate=datetime.datetime(2019, 7, 19, 1, 39, 54, 173000), _LastEditorDisplayName='Rich B', _LastEditorUserId=3641067, _OwnerDisplayName=None, _OwnerUs

In [8]:
def language_detection(x):

  tag = None
  for language in pro_lang_list:
    if "<" + language.lower() + ">" in x._Tags.lower():
      tag = language
      break
  if tag is None:
    return None
  return (x._Id, tag)


def check_date(x, year):

  start = datetime(year=year, month=1, day=1)
  end = datetime(year=year, month=12, day=31)
  CreationDate = x._CreationDate
  return CreationDate >= start and CreationDate <= end

final_result = {}
for year in range(2010, 2020):
  final_result[year] = posts_sample.rdd\
      .filter(lambda x: x._Tags is not None and check_date(x, year))\
      .map(language_detection)\
      .filter(lambda x: x is not None)\
      .keyBy(lambda x: x[1])\
      .aggregateByKey(
          0,
          lambda x, y: x + 1,
          lambda x1, x2: x1 + x2,
      )\
      .sortBy(lambda x: x[1], ascending=False)\
      .toDF()
  final_result[year] = final_result[year].select(col("_1").alias("Programming_language"),
                                                 col("_2").alias(f"Number_of_mentions_in_{year}")).limit(10)
  final_result[year].show()

+--------------------+--------------------------+
|Programming_language|Number_of_mentions_in_2010|
+--------------------+--------------------------+
|                Java|                        52|
|          JavaScript|                        44|
|                 PHP|                        42|
|              Python|                        25|
|         Objective-C|                        22|
|                   C|                        20|
|                Ruby|                        11|
|              Delphi|                         7|
|         AppleScript|                         3|
|                   R|                         3|
+--------------------+--------------------------+

+--------------------+--------------------------+
|Programming_language|Number_of_mentions_in_2011|
+--------------------+--------------------------+
|                Java|                        84|
|                 PHP|                        81|
|          JavaScript|                        69|

In [10]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [12]:
%cd drive/MyDrive/Colab/bigdata

/content/drive/MyDrive/Colab/bigdata


In [14]:
for year in final_result.keys():
    final_result[year].write.format("parquet").save(f"/content/drive/MyDrive/Colab/bigdata/top_{year}")