In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=57d45430f35737f64f48c7c3e5de52ab48c9f0664ff2cfaed2eadf039e63abb6
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [3]:
import re
from typing import List

import pyspark.sql as sql

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType, IntegerType, ArrayType, StringType
from pyspark.sql.functions import udf, explode, rank, desc
from pyspark.sql.functions import col, max, sum, lower, countDistinct

In [4]:
spark = SparkSession \
    .builder \
    .appName("L2_ApacheSpart_Reports") \
    .config("spark.jars.packages", "com.databricks:spark-xml_2.12:0.13.0")\
    .getOrCreate()

In [5]:
posts_data = spark.read.format('xml').options(rowTag='row').load('posts_sample.xml')

prog_lang_data = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y H:m')\
.csv("programming-languages.csv")

In [6]:
def get_tags(tags_string): #Функции для обработки содержимого столбцов с тегами и датой последней активности поста
    if tags_string is None:
        return []

    pattern = r'<(.+?)>'
    tags = re.findall(pattern, tags_string)

    return tags

def get_year(date_and_time):
    return date_and_time.year

get_tags_udf = udf(get_tags, ArrayType(StringType()))
get_year_udf = udf(get_year, IntegerType())

In [7]:
# Применение приведенных выше функция для преобразования данных в нужный вид
posts_data_simplified = posts_data \
                        .withColumn("tags", get_tags_udf(posts_data["_Tags"])) \
                        .withColumn("year", get_year_udf(posts_data["_LastActivityDate"]))

# Выбор только нужных столбцов
posts_data_simplified = posts_data_simplified.select(
    col("tags"),
    col("year"),
    col("_ViewCount").alias("views")
)

# Отображение полученных данных
posts_data_simplified.show()

+--------------------+----+------+
|                tags|year| views|
+--------------------+----+------+
|[c#, floating-poi...|2019| 42817|
|[html, css, inter...|2019| 18214|
|                  []|2017|  NULL|
|[c#, .net, datetime]|2019|555183|
|[c#, datetime, ti...|2019|149445|
|                  []|2018|  NULL|
|[html, browser, t...|2019|176405|
|        [.net, math]|2018|123231|
|                  []|2010|  NULL|
|                  []|2010|  NULL|
|                  []|2010|  NULL|
|                  []|2010|  NULL|
|                  []|2010|  NULL|
|                  []|2010|  NULL|
|                  []|2010|  NULL|
|                  []|2010|  NULL|
|                  []|2010|  NULL|
|                  []|2010|  NULL|
|                  []|2013|  NULL|
|                  []|2010|  NULL|
+--------------------+----+------+
only showing top 20 rows



#Удаление тегов, не соответствующих языкам программирования

In [8]:
# Добавление столбца, содержащего название языка программирования, но в нижнем регистре
prog_lang_data_modified = prog_lang_data.withColumn("tag_lowercase", lower(prog_lang_data["name"]))

# Разбиение массива тегов на отдельные столбцы
posts_data_filtered = posts_data_simplified.select("year", explode("tags").alias("tag"), "views")

# Объединение таблиц для того, чтобы оставить только те строки, где поле tag у постов является языком программирования
posts_data_filtered = posts_data_filtered.join(prog_lang_data_modified,
                                    (posts_data_filtered["tag"] == prog_lang_data_modified["tag_lowercase"]),
                                    "inner")

# Выбор нужных столбцов
posts_data_filtered = posts_data_filtered.select("year", "tag", "views")

# Отображение результата
posts_data_filtered.show()

+----+-----------+-----+
|year|        tag|views|
+----+-----------+-----+
|2010|       java|  132|
|2010|        php| 1258|
|2015|       ruby| 9649|
|2010|          c| 2384|
|2015|        php| 1987|
|2010|     python| 3321|
|2010| javascript|  128|
|2010|applescript|  477|
|2010|        php| 1748|
|2010|        php|  998|
|2013| javascript| 2095|
|2010|        sed|  447|
|2015|     python| 6558|
|2015|       java|  214|
|2015|       ruby|  214|
|2013|objective-c|  852|
|2010| javascript|  179|
|2010|          r| 6709|
|2010|        php|   78|
|2010| javascript| 1280|
+----+-----------+-----+
only showing top 20 rows



In [9]:
# Группировка по году последней активности и тегам, суммирование всех просмотров для каждого языка программирования в пределах одного года
posts_data_sorted = posts_data_filtered.groupBy("year", "tag").agg(sum("views").alias("total_views"))

# Сортировка по году и количеству просмотров
posts_data_sorted = posts_data_sorted.orderBy("year", desc("total_views"))

# Отображение результата
posts_data_sorted.show()

+----+------------+-----------+
|year|         tag|total_views|
+----+------------+-----------+
|2008|        java|      11532|
|2008|        ruby|       1843|
|2008|         x++|       1363|
|2009|      python|      32219|
|2009|  javascript|      17139|
|2009|           c|      16356|
|2009|        java|      13533|
|2009|         php|      12876|
|2009|        bash|       4410|
|2009|     haskell|       3992|
|2009|       xpath|       3869|
|2009| objective-c|       3671|
|2009|      delphi|       3477|
|2009|        ruby|       2844|
|2009|  powershell|        536|
|2009|actionscript|        318|
|2010|        java|      53333|
|2010|      matlab|      51865|
|2010| objective-c|      43878|
|2010|         php|      39730|
+----+------------+-----------+
only showing top 20 rows



#Составление итогового отчета (N самых популярных языков программирования за год)

In [10]:
# Разбиение по году последней активности для нумерации языков программирования в соответствии с их популярностью
window_spec = Window.partitionBy("year").orderBy(posts_data_sorted["total_views"].desc())

# Добавление столбца rank, определяющего положение языка программирования в топе
posts_data_with_ranks = posts_data_sorted.withColumn("rank", rank().over(window_spec))

# Оставляем только первые N языков программирования для каждого года
languages_per_year = 10
posts_data_sorted_result = posts_data_with_ranks.filter(posts_data_with_ranks["rank"] <= languages_per_year)

# Удаление столбца rank
posts_data_sorted_result = posts_data_sorted_result.drop(col("rank"))

# Сортировка по году и количеству просмотров
posts_data_sorted_result = posts_data_sorted_result.orderBy("year", desc("total_views"))

# Отображение результата
posts_data_sorted_result.show()

+----+-----------+-----------+
|year|        tag|total_views|
+----+-----------+-----------+
|2008|       java|      11532|
|2008|       ruby|       1843|
|2008|        x++|       1363|
|2009|     python|      32219|
|2009| javascript|      17139|
|2009|          c|      16356|
|2009|       java|      13533|
|2009|        php|      12876|
|2009|       bash|       4410|
|2009|    haskell|       3992|
|2009|      xpath|       3869|
|2009|objective-c|       3671|
|2009|     delphi|       3477|
|2010|       java|      53333|
|2010|     matlab|      51865|
|2010|objective-c|      43878|
|2010|        php|      39730|
|2010| javascript|      37059|
|2010|     python|      25930|
|2010|       ruby|      15864|
+----+-----------+-----------+
only showing top 20 rows



In [14]:
posts_data_sorted_result.write.parquet("/output")