In [1]:
!pip install findspark
import findspark
findspark.init()



In [2]:
from datetime import datetime
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, DateType

In [3]:
from pyspark.sql import SparkSession
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.13.0 pyspark-shell'

spark_session = SparkSession\
    .builder\
    .getOrCreate()

In [4]:
spark_session

In [5]:
 sc = spark_session._sc

In [6]:
!head /mnt/data/programming-languages.csv

name,wikipedia_url
A# .NET,https://en.wikipedia.org/wiki/A_Sharp_(.NET)
A# (Axiom),https://en.wikipedia.org/wiki/A_Sharp_(Axiom)
A-0 System,https://en.wikipedia.org/wiki/A-0_System
A+,https://en.wikipedia.org/wiki/A%2B_(programming_language)
A++,https://en.wikipedia.org/wiki/A%2B%2B
ABAP,https://en.wikipedia.org/wiki/ABAP
ABC,https://en.wikipedia.org/wiki/ABC_(programming_language)
ABC ALGOL,https://en.wikipedia.org/wiki/ABC_ALGOL
ABSET,https://en.wikipedia.org/wiki/ABSET


## Read csv file

In [7]:
prog_lang_schema = StructType([
    StructField("name", StringType(), False),
    StructField("wikipedia_url", StringType(), False),
])

In [8]:
prog_langs_df = spark_session.read.csv("file:///mnt/data/programming-languages.csv", schema=prog_lang_schema)

In [9]:
prog_langs_df

DataFrame[name: string, wikipedia_url: string]

In [10]:
prog_langs_df.head()

Row(name='name', wikipedia_url='wikipedia_url')

In [11]:
prog_langs_list = [str(row[0]) for row in prog_langs_df.select('name').collect()]

In [12]:
prog_langs_list[:4]

['name', 'A# .NET', 'A# (Axiom)', 'A-0 System']

## Read xml file

In [13]:
!head -n 4 /mnt/data/posts_sample.xml

���<?xml version="1.0" encoding="utf-8"?>
<posts>
  <row Id="4" PostTypeId="1" AcceptedAnswerId="7" CreationDate="2008-07-31T21:42:52.667" Score="630" ViewCount="42817" Body="&lt;p&gt;I want to use a track-bar to change a form's opacity.&lt;/p&gt;&#xA;&#xA;&lt;p&gt;This is my code:&lt;/p&gt;&#xA;&#xA;&lt;pre&gt;&lt;code&gt;decimal trans = trackBar1.Value / 5000;&#xA;this.Opacity = trans;&#xA;&lt;/code&gt;&lt;/pre&gt;&#xA;&#xA;&lt;p&gt;When I build the application, it gives the following error:&lt;/p&gt;&#xA;&#xA;&lt;blockquote&gt;&#xA;  &lt;p&gt;Cannot implicitly convert type &lt;code&gt;'decimal'&lt;/code&gt; to &lt;code&gt;'double'&lt;/code&gt;&lt;/p&gt;&#xA;&lt;/blockquote&gt;&#xA;&#xA;&lt;p&gt;I tried using &lt;code&gt;trans&lt;/code&gt; and &lt;code&gt;double&lt;/code&gt; but then the control doesn't work. This code worked fine in a past VB.NET project.&lt;/p&gt;&#xA;" OwnerUserId="8" LastEditorUserId="3641067" LastEditorDisplayName="Rich B" LastEditDate="2019-07-19T01:39:54.1

In [14]:
posts_sample_df = spark_session.read.format("xml").options(rowTag="row").load('file:///mnt/data/posts_sample.xml')

In [15]:
posts_sample_df.head()

Row(_AcceptedAnswerId=7, _AnswerCount=13, _Body="<p>I want to use a track-bar to change a form's opacity.</p>\n\n<p>This is my code:</p>\n\n<pre><code>decimal trans = trackBar1.Value / 5000;\nthis.Opacity = trans;\n</code></pre>\n\n<p>When I build the application, it gives the following error:</p>\n\n<blockquote>\n  <p>Cannot implicitly convert type <code>'decimal'</code> to <code>'double'</code></p>\n</blockquote>\n\n<p>I tried using <code>trans</code> and <code>double</code> but then the control doesn't work. This code worked fine in a past VB.NET project.</p>\n", _ClosedDate=None, _CommentCount=2, _CommunityOwnedDate=datetime.datetime(2012, 10, 31, 16, 42, 47, 213000), _CreationDate=datetime.datetime(2008, 7, 31, 21, 42, 52, 667000), _FavoriteCount=48, _Id=4, _LastActivityDate=datetime.datetime(2019, 7, 19, 1, 39, 54, 173000), _LastEditDate=datetime.datetime(2019, 7, 19, 1, 39, 54, 173000), _LastEditorDisplayName='Rich B', _LastEditorUserId=3641067, _OwnerDisplayName=None, _OwnerUse

In [22]:
def define_language(row):
    language_tag = None
    for lang in prog_langs_list:
        if '<' + lang.lower() + '>' in row._Tags.lower():
            language_tag = lang
            break
    if language_tag is None:
        return None
    return (row._Id, row._CreationDate.year, language_tag)

In [23]:
datetime(year=2020, month=1, day=1).year

2020

In [61]:
def check_date_range(row):
    left_border = datetime(year=2010, month=1, day=1)
    right_border = datetime(year=2021, month=1, day=1)
    created_date = row._CreationDate
    return created_date > left_border and created_date < right_border

In [62]:
def flatten(row):
    key, value = row
    language, year = key
    n_samples = value
    return language, year, n_samples

In [63]:
# 1. Select rows with not null tag and an appropriate date
# 2. (xml ...) -> (id, date, labguage)
# 3. Remove rows with tags without language
# 4. [(id, data, language)] -> {(language, date): (id, date, language)}
# 5. {(language, date): n_samples}
# 6. [(language, date, n_samples)]
top_languages = posts_sample_df.rdd\
    .filter(lambda row: row._Tags is not None and check_date_range(row))\
    .map(define_language)\
    .filter(lambda row: row is not None)\
    .keyBy(lambda row: (row[2], row[1]))\
    .aggregateByKey(
        0,
        lambda acc, value: acc + 1,
        lambda acc1, acc2: acc1 + acc2,
    )\
    .sortBy(lambda row: row[1], ascending=False)\
    .map(flatten)

In [67]:
per_year_report = {}
for year in range(2010, 2020 + 1):
    # 1. Take rows for a particular year
    # 2. Remove year from all rows
    # 3. Sort by the number of posts
    # 4. Convert to dataframe
    # 5. Change columns names
    # 6. Take 10 most popular languages
    # 7. Convert list of top languages to RDD
    # 8. Converr to dataframe
    try:
        result = top_languages\
            .filter(lambda row: int(row[1]) == year)\
            .map(lambda row: (row[0], row[2]))\
            .sortBy(lambda row: row[1], ascending=False)\
            .toDF()\
            .select(col('_1').alias('ProgrammingLanguage'), col('_2').alias('NumberOfPosts'))\
            .take(10)
        result = sc.parallelize(result).toDF()
        per_year_report[year] = result
    except:
        continue

In [68]:
# There are no posts in 2020
per_year_report

{2010: DataFrame[ProgrammingLanguage: string, NumberOfPosts: bigint],
 2011: DataFrame[ProgrammingLanguage: string, NumberOfPosts: bigint],
 2012: DataFrame[ProgrammingLanguage: string, NumberOfPosts: bigint],
 2013: DataFrame[ProgrammingLanguage: string, NumberOfPosts: bigint],
 2014: DataFrame[ProgrammingLanguage: string, NumberOfPosts: bigint],
 2015: DataFrame[ProgrammingLanguage: string, NumberOfPosts: bigint],
 2016: DataFrame[ProgrammingLanguage: string, NumberOfPosts: bigint],
 2017: DataFrame[ProgrammingLanguage: string, NumberOfPosts: bigint],
 2018: DataFrame[ProgrammingLanguage: string, NumberOfPosts: bigint],
 2019: DataFrame[ProgrammingLanguage: string, NumberOfPosts: bigint]}

In [69]:
for year, report in per_year_report.items():
    print(f'Report for {year} year.')
    report.show()
    print('\n\n\n')

Report for 2010 year.
+-------------------+-------------+
|ProgrammingLanguage|NumberOfPosts|
+-------------------+-------------+
|               Java|           52|
|         JavaScript|           44|
|                PHP|           42|
|             Python|           25|
|        Objective-C|           23|
|                  C|           20|
|               Ruby|           11|
|             Delphi|            7|
|        AppleScript|            3|
|                  R|            3|
+-------------------+-------------+





Report for 2011 year.
+-------------------+-------------+
|ProgrammingLanguage|NumberOfPosts|
+-------------------+-------------+
|                PHP|           97|
|               Java|           92|
|         JavaScript|           82|
|             Python|           35|
|        Objective-C|           33|
|                  C|           24|
|               Ruby|           17|
|             Delphi|            8|
|               Perl|            8|
|              

In [70]:
# We can see that Python started to dominate other languages over time

In [71]:
for year, report in per_year_report.items():
    report.write.parquet(f"{year}_top_languages_report.parquet")
print('All reports saved!')

All reports saved!
