In [8]:

!pip3 install pyspark==3.1.2




In [9]:

import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql import SparkSession

import os
import sys

In [10]:
# Задаем переменные окружения для корректной работы некоторых функций
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Задаем переменную окружения для парсинга xml
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.17.0 pyspark-shell'


In [11]:
spark = SparkSession.builder.getOrCreate()
spark

In [12]:


!wget https://git.ai.ssau.ru/tk/big_data/raw/branch/master/data/posts_sample.xml

--2024-04-13 21:48:16--  https://git.ai.ssau.ru/tk/big_data/raw/branch/master/data/posts_sample.xml
Resolving git.ai.ssau.ru (git.ai.ssau.ru)... 91.222.131.161
Connecting to git.ai.ssau.ru (git.ai.ssau.ru)|91.222.131.161|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 74162295 (71M) [text/plain]
Saving to: ‘posts_sample.xml.1’


2024-04-13 21:48:55 (1.94 MB/s) - ‘posts_sample.xml.1’ saved [74162295/74162295]



**1. Найти велосипед с максимальным временем пробега.**

In [13]:
postsData = spark.read.format('xml').option('rowTag', 'row').option("timestampFormat", 'y/M/d H:m:s').load('posts_sample.xml')



In [14]:
# Задаем промеуток с 1 января 2010 года по 31 декабря 2020 года
dates = ("2010-01-01",  "2020-12-31")

# Выбираем посты в заданный промежуток
posts_by_date = postsData.filter(F.col("_CreationDate").between(*dates))
posts_by_date.show(10)

+-----------------+------------+--------------------+-----------+-------------+--------------------+--------------------+--------------+-------+--------------------+--------------------+----------------------+-----------------+-----------------+------------+---------+-----------+------+-----+------+----------+
|_AcceptedAnswerId|_AnswerCount|               _Body|_ClosedDate|_CommentCount| _CommunityOwnedDate|       _CreationDate|_FavoriteCount|    _Id|   _LastActivityDate|       _LastEditDate|_LastEditorDisplayName|_LastEditorUserId|_OwnerDisplayName|_OwnerUserId|_ParentId|_PostTypeId|_Score|_Tags|_Title|_ViewCount|
+-----------------+------------+--------------------+-----------+-------------+--------------------+--------------------+--------------+-------+--------------------+--------------------+----------------------+-----------------+-----------------+------------+---------+-----------+------+-----+------+----------+
|             null|        null|<p>No. (And more ...|       null

In [15]:
!wget https://git.ai.ssau.ru/tk/big_data/raw/branch/master/data/programming-languages.csv

--2024-04-13 21:49:15--  https://git.ai.ssau.ru/tk/big_data/raw/branch/master/data/programming-languages.csv
Resolving git.ai.ssau.ru (git.ai.ssau.ru)... 91.222.131.161
Connecting to git.ai.ssau.ru (git.ai.ssau.ru)|91.222.131.161|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 40269 (39K) [text/plain]
Saving to: ‘programming-languages.csv’


2024-04-13 21:49:16 (354 KB/s) - ‘programming-languages.csv’ saved [40269/40269]



In [16]:
languagesData = spark.read.format('csv').option('header', 'true').option("inferSchema", True).load('programming-languages.csv').dropna()


In [17]:

print("Схема")
languagesData.printSchema()

print("Первые 5 элементов")
languagesData.show(n = 5)

print("Описание датасета")
languagesData.describe().show()

print("Количество элементов")
print(languagesData.count())


Схема
root
 |-- name: string (nullable = true)
 |-- wikipedia_url: string (nullable = true)

Первые 5 элементов
+----------+--------------------+
|      name|       wikipedia_url|
+----------+--------------------+
|   A# .NET|https://en.wikipe...|
|A# (Axiom)|https://en.wikipe...|
|A-0 System|https://en.wikipe...|
|        A+|https://en.wikipe...|
|       A++|https://en.wikipe...|
+----------+--------------------+
only showing top 5 rows

Описание датасета
+-------+--------+--------------------+
|summary|    name|       wikipedia_url|
+-------+--------+--------------------+
|  count|     699|                 699|
|   mean|    null|                null|
| stddev|    null|                null|
|    min|@Formula|https://en.wikipe...|
|    max|xHarbour|https://en.wikipe...|
+-------+--------+--------------------+

Количество элементов
699


In [18]:
# Названия языков
language_names = [str(x[0]) for x in languagesData.collect()]
language_names

['A# .NET',
 'A# (Axiom)',
 'A-0 System',
 'A+',
 'A++',
 'ABAP',
 'ABC',
 'ABC ALGOL',
 'ABSET',
 'ABSYS',
 'ACC',
 'Accent',
 'Ace DASL',
 'ACL2',
 'ACT-III',
 'Action!',
 'ActionScript',
 'Ada',
 'Adenine',
 'Agda',
 'Agilent VEE',
 'Agora',
 'AIMMS',
 'Alef',
 'ALF',
 'ALGOL 58',
 'ALGOL 60',
 'ALGOL 68',
 'ALGOL W',
 'Alice',
 'Alma-0',
 'AmbientTalk',
 'Amiga E',
 'AMOS',
 'AMPL',
 'Apex (Salesforce.com)',
 'APL',
 "App Inventor for Android's visual block language",
 'AppleScript',
 'Arc',
 'ARexx',
 'Argus',
 'AspectJ',
 'Assembly language',
 'ATS',
 'Ateji PX',
 'AutoHotkey',
 'Autocoder',
 'AutoIt',
 'AutoLISP / Visual LISP',
 'Averest',
 'AWK',
 'Axum',
 'B',
 'Babbage',
 'Bash',
 'BASIC',
 'bc',
 'BCPL',
 'BeanShell',
 'Batch (Windows/Dos)',
 'Bertrand',
 'BETA',
 'Bigwig',
 'Bistro',
 'BitC',
 'BLISS',
 'Blockly',
 'BlooP',
 'Blue',
 'Boo',
 'Boomerang',
 'Bourne shell (including',
 'bash and',
 'ksh )',
 'BREW',
 'BPEL',
 'C',
 'C--',
 'C++ – ISO/IEC 14882',
 'C# – ISO/IEC

In [19]:

# Определение языка в теге постаа
def includes_name(x):
    tag = None
    for name in language_names:
        n = '<' + name.lower() + '>'
        if n in str(x._Tags).lower():
            tag = name
            break
    if tag is None:
        tag = 'No'

    return (x[6], tag)

In [20]:
# Преобразование в RDD
posts_by_date_rdd = posts_by_date.rdd.map(includes_name).filter(lambda x: x[1] != 'No')


In [21]:
# Группировка данных по году и языку программирования, подсчет количества записей и сортировка по убыванию
posts_by_date_rdd_group = posts_by_date_rdd.keyBy(lambda row: (row[0].year, row[1])).aggregateByKey(0, lambda x, y: x + 1, lambda x1, x2: x1 + x2).sortBy(lambda x: x[1], ascending=False).collect()


In [22]:
# Создание списка years_list с годами от 2010 до 2020 в обратном порядке
years_list = [i for i in range(2010, 2021)][::-1]


In [23]:
# Формирование списка df_by_years с данными по топ-10 языкам программирования для каждого года из years_list
df_by_years = []
for year in years_list:
    df_by_years.extend([row for row in posts_by_date_rdd_group if row[0][0] == year][:10])


In [24]:

# Создание Row-шаблона row_template с полями 'Year', 'Language', 'Count'
row_template = Row('Year', 'Language', 'Count')


In [25]:

# Создание DataFrame result_df на основе данных из df_by_years и вывод его содержимого
result_df = spark.createDataFrame([row_template(*x, y) for x, y in df_by_years])
result_df.show()

+----+----------+-----+
|Year|  Language|Count|
+----+----------+-----+
|2019|    Python|  162|
|2019|JavaScript|  131|
|2019|      Java|   95|
|2019|       PHP|   59|
|2019|         R|   36|
|2019|         C|   14|
|2019|        Go|    9|
|2019|      Dart|    9|
|2019|    MATLAB|    9|
|2019|      Ruby|    8|
|2018|    Python|  214|
|2018|JavaScript|  196|
|2018|      Java|  145|
|2018|       PHP|   99|
|2018|         R|   63|
|2018|         C|   24|
|2018|     Scala|   22|
|2018|TypeScript|   21|
|2018|PowerShell|   13|
|2018|      Bash|   12|
+----+----------+-----+
only showing top 20 rows



In [26]:

# Получившийся отчёт сохранить в формате Apache Parquet
result_df.write.parquet("10_popular_language.parquet")