# Задание
Сформировать отчёт с информацией о 10 наиболее популярных языках программирования по итогам года за период с 2010 по 2020 годы. Отчёт будет отражать динамику изменения популярности языков программирования и представлять собой набор таблиц "топ-10" для каждого года.

Получившийся отчёт сохранить в формате Apache Parquet.

Для выполнения задания используется RDD API.

In [56]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, split, regexp_replace, year, count, row_number
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType
import xml.etree.ElementTree as ET

In [57]:
# Инициализация SparkSession
spark = SparkSession.builder.appName("ProgrammingLanguagesAnalysis").getOrCreate()

# Пути к данным
posts_path = "posts_sample.xml"
languages_path = "programming-languages.csv"

# Диапазон лет и топ-N
years = list(range(2010, 2021))
top_count = 10

# Подзадачи "Преобразовать любой файл набора данных stackoverflow в parquet формат c помощью Apache Spark.":
1. создать объект типа RDD для текстового файла /data/stackoverflow/posts.xml
2. добавить к каждому объекту индекс .zipWithIndex
3. убрать из коллекции строки 1, 2 и последню фильтрацией по индексу
4. из отфильтрованной коллекции строк получить коллекцию объектов scala.xml.Elem применением метода scala.xml.XML.loadString к каждой строке 5a) определить класс-схему таблицы с именами и типами, соответствующими атрибутам и типам значений строк в xml, используя case class 5b) преобразовать коллекцию scala.xml.Elem в коллекцию объектов вновь созданного типа
5. создать Dataset на основе коллекции объектов класса-схемы методом spark.createDataset
6. сохранить таблицу в parquet файл

In [58]:
# === 1. Создать объект типа RDD для текстового файла ===
rdd = spark.read.text(posts_path).rdd.map(lambda x: x[0])

In [59]:
# === 2. Добавить к каждому объекту индекс .zipWithIndex ===
rdd_indexed = rdd.zipWithIndex()

In [60]:
# === 3. Убрать из коллекции строки 1, 2 и последню фильтрацией по индексу ===
total_rows = rdd_indexed.count()
rdd_filtered = rdd_indexed.filter(lambda x: 1 < x[1] < total_rows - 1).map(lambda x: x[0])

In [61]:
# === 4. Преобразовать строки в XML-объекты ===
def parse_xml(line):
    try:
        root = ET.fromstring(line)
        return (
            root.attrib.get("Id", ""),
            root.attrib.get("CreationDate", ""),
            root.attrib.get("Tags", "")
        )
    except:
        return None

rdd_parsed = rdd_filtered.map(parse_xml).filter(lambda x: x is not None)

In [62]:
# === 5а. Определить класс-схему ===
schema = StructType([
    StructField("Id", StringType(), True),
    StructField("CreationDate", StringType(), True),
    StructField("Tags", StringType(), True)
])

In [63]:
# === 5б. Преобразовать RDD в DataFrame ===
df = spark.createDataFrame(rdd_parsed, schema)

In [64]:
# Обработка данных
df = df.withColumn("Year", year(col("CreationDate")))
df = df.withColumn("Tags", regexp_replace(col("Tags"), "[<>]", ""))
df = df.withColumn("Tags", split(col("Tags"), " "))
df_exploded = df.select("Year", explode(col("Tags")).alias("Language"))

# Загрузка списка языков программирования
languages_df = spark.read.option("header", "true").csv(languages_path)
languages_list = [row[0].lower() for row in languages_df.collect()]
df_filtered = df_exploded.filter((col("Year").isin(years)) & (col("Language").isin(languages_list))).cache()

# Подсчёт количества упоминаний языков по годам
df_count = df_filtered.groupBy("Year", "Language").agg(count("Language").alias("Count"))

# === 5. Создать Dataset на основе коллекции объектов класса-схемы ===
window_spec = Window.partitionBy("Year").orderBy(col("Count").desc())
df_top = df_count.withColumn("Rank", row_number().over(window_spec)).filter(col("Rank") <= top_count)

In [65]:
# === 6. Сохранение в Parquet ===
df_top.write.mode("overwrite").parquet("top10_languages.parquet")
df.write.mode("overwrite").parquet("parsed_posts.parquet")

# Вывод результатов
df_top.orderBy("Year", col("Count").desc()).show(len(years) * top_count)

+----+-----------+-----+----+
|Year|   Language|Count|Rank|
+----+-----------+-----+----+
|2010|        php|    7|   1|
|2010|     python|    4|   2|
|2010|          c|    3|   3|
|2010|       java|    2|   4|
|2010|         go|    1|   5|
|2010|        ksh|    1|   6|
|2011|       java|    7|   1|
|2011|        php|    6|   2|
|2011|     python|    5|   3|
|2011|          c|    5|   4|
|2011| javascript|    4|   5|
|2011|objective-c|    1|   6|
|2011|       ruby|    1|   7|
|2011|       cuda|    1|   8|
|2011|    haskell|    1|   9|
|2011|     delphi|    1|  10|
|2012|        php|   17|   1|
|2012| javascript|    7|   2|
|2012|       java|    5|   3|
|2012|          c|    5|   4|
|2012|     python|    4|   5|
|2012|      scala|    3|   6|
|2012|       ruby|    3|   7|
|2012|objective-c|    2|   8|
|2012|          r|    2|   9|
|2012|       bash|    1|  10|
|2013| javascript|   15|   1|
|2013|        php|   13|   2|
|2013|       java|   10|   3|
|2013|          r|    8|   4|
|2013|    