In [22]:
import pyspark
import os

from datetime import datetime
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import StructType, StructField, StringType, DateType

In [23]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.13.0 pyspark-shell'
sc = SparkSession.builder.appName("L2").master("local[*]").getOrCreate()
sc

In [25]:
!hadoop fs -put datasets/programming-languages.csv data/programming-languages.csv

In [27]:
pro_lang = sc.read.csv("data/programming-languages.csv")

In [28]:
pro_lang_list = [str(x[0]) for x in pro_lang.collect()]

In [29]:
pro_lang_list[:10]

['name',
 'A# .NET',
 'A# (Axiom)',
 'A-0 System',
 'A+',
 'A++',
 'ABAP',
 'ABC',
 'ABC ALGOL',
 'ABSET']

In [33]:
!hadoop fs -put datasets/posts_sample.xml data/posts_sample.xml

In [35]:
posts_sample = sc.read.format("xml").options(rowTag="row").load('data/posts_sample.xml')

In [36]:
posts_sample.take(1)

[Row(_AcceptedAnswerId=7, _AnswerCount=13, _Body="<p>I want to use a track-bar to change a form's opacity.</p>\n\n<p>This is my code:</p>\n\n<pre><code>decimal trans = trackBar1.Value / 5000;\nthis.Opacity = trans;\n</code></pre>\n\n<p>When I build the application, it gives the following error:</p>\n\n<blockquote>\n  <p>Cannot implicitly convert type <code>'decimal'</code> to <code>'double'</code></p>\n</blockquote>\n\n<p>I tried using <code>trans</code> and <code>double</code> but then the control doesn't work. This code worked fine in a past VB.NET project.</p>\n", _ClosedDate=None, _CommentCount=2, _CommunityOwnedDate=datetime.datetime(2012, 10, 31, 20, 42, 47, 213000), _CreationDate=datetime.datetime(2008, 8, 1, 2, 42, 52, 667000), _FavoriteCount=48, _Id=4, _LastActivityDate=datetime.datetime(2019, 7, 19, 5, 39, 54, 173000), _LastEditDate=datetime.datetime(2019, 7, 19, 5, 39, 54, 173000), _LastEditorDisplayName='Rich B', _LastEditorUserId=3641067, _OwnerDisplayName=None, _OwnerUser

Функция language_detection() приводит весь текст к нижнему регистру и исследует каждую строку на наличие названия языка программирования. Если язык обнаружен, создается кортеж с информацией о нем, в противном случае возвращается значение None. Функция check_date() выполняет проверку даты, поскольку требуется оценить промежуток времени от 2010 до 2020.

In [37]:
def language_detection(x):
    tag = None
    for language in pro_lang_list:
        if "<" + language.lower() + ">" in x._Tags.lower():
            tag = language
            break
    if tag is None:
        return None
    return (x._Id, tag)


def check_date(x, year): 
    start = datetime(year=year, month=1, day=1)
    end = datetime(year=year, month=12, day=31)
    CreationDate = x._CreationDate
    return CreationDate >= start and CreationDate <= end

Этот участок кода фильтрует строки, исключая пустые значения, и выбирает временной интервал с 2010 по 2019 год. Затем производится поиск языков программирования в каждой строке и удаляются пустые значения. В случае, если язык не обнаружен, он исключается из анализа. Далее подсчитывается количество упоминаний каждого языка программирования в каждом году и происходит сортировка по частоте упоминаний. Результат сортируется от наиболее часто упоминаемого к наименее.

In [38]:
final_result = {}
for year in range(2010, 2020):
    final_result[year] = posts_sample.rdd.filter(
        lambda x: x._Tags is not None and check_date(x, year)
    ).map(
        language_detection
    ).filter(
        lambda x: x is not None
    ).keyBy(
        lambda x: x[1]
    ).aggregateByKey(
        0,
        lambda x, y: x + 1,
        lambda x1, x2: x1 + x2
    ).sortBy(lambda x: x[1], ascending=False).toDF()
    final_result[year] = final_result[year].select(
        col("_1").alias("Programming_language"), 
        col("_2").alias(f"Number_of_mentions_in_{year}")
    ).limit(10)
    final_result[year].show()

+--------------------+--------------------------+
|Programming_language|Number_of_mentions_in_2010|
+--------------------+--------------------------+
|                Java|                        52|
|          JavaScript|                        44|
|                 PHP|                        42|
|              Python|                        25|
|         Objective-C|                        22|
|                   C|                        20|
|                Ruby|                        11|
|              Delphi|                         7|
|         AppleScript|                         3|
|                   R|                         3|
+--------------------+--------------------------+

+--------------------+--------------------------+
|Programming_language|Number_of_mentions_in_2011|
+--------------------+--------------------------+
|                 PHP|                        97|
|                Java|                        92|
|          JavaScript|                        82|

In [39]:
for year in final_result.keys():
    final_result[year].write.format("parquet").save(f"top_{year}")

In [41]:
!hadoop fs -ls top_2019

Found 2 items
-rwxr-xr-x   3 sumrako sumrako          0 2023-12-19 12:20 top_2019/_SUCCESS
-rwxr-xr-x   3 sumrako sumrako        885 2023-12-19 12:20 top_2019/part-00000-74e3cd30-a1d1-44a9-9391-9f3ab114485e-c000.snappy.parquet


In [7]:
sc.stop()