# СмолькинаЕВ 6408
# Лабораторная работа №2 по курсу BigData

In [1]:
from datetime import datetime
from pyspark.sql.types import StructType, StructField, StringType, DateType
from pyspark.sql.functions import asc, desc, rank, col
from pyspark.sql.window import Window

In [2]:
from pyspark.sql import SparkSession
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.13.0 pyspark-shell'

spark_session = SparkSession.builder.appName("Lab2").getOrCreate()
sc = spark_session._sc
spark_session



In [5]:
progLangSchema = StructType([
    StructField("Language", StringType(), False),
    StructField("Url_language", StringType(), False),
])
dfProgLangs = spark_session.read.csv("programming-languages.csv", schema=progLangSchema)
dfProgLangs.head()

Row(Language='name', Url_language='wikipedia_url')

In [11]:
firstRow = dfProgLangs.rdd.first()
progLangsList = dfProgLangs.rdd\
    .filter(lambda x: x != firstRow)\
    .map(lambda x: x[0])\
    .collect()
progLangsList[:10]

['A# .NET',
 'A# (Axiom)',
 'A-0 System',
 'A+',
 'A++',
 'ABAP',
 'ABC',
 'ABC ALGOL',
 'ABSET',
 'ABSYS']

In [12]:
dfPostsSample = spark_session.read.format("xml").options(rowTag="row").load('posts_sample.xml')
print(dfPostsSample)
print("\n\n")
print(dfPostsSample.first())

DataFrame[_AcceptedAnswerId: bigint, _AnswerCount: bigint, _Body: string, _ClosedDate: timestamp, _CommentCount: bigint, _CommunityOwnedDate: timestamp, _CreationDate: timestamp, _FavoriteCount: bigint, _Id: bigint, _LastActivityDate: timestamp, _LastEditDate: timestamp, _LastEditorDisplayName: string, _LastEditorUserId: bigint, _OwnerDisplayName: string, _OwnerUserId: bigint, _ParentId: bigint, _PostTypeId: bigint, _Score: bigint, _Tags: string, _Title: string, _ViewCount: bigint]



Row(_AcceptedAnswerId=7, _AnswerCount=13, _Body="<p>I want to use a track-bar to change a form's opacity.</p>\n\n<p>This is my code:</p>\n\n<pre><code>decimal trans = trackBar1.Value / 5000;\nthis.Opacity = trans;\n</code></pre>\n\n<p>When I build the application, it gives the following error:</p>\n\n<blockquote>\n  <p>Cannot implicitly convert type <code>'decimal'</code> to <code>'double'</code></p>\n</blockquote>\n\n<p>I tried using <code>trans</code> and <code>double</code> but then the control doesn't

In [39]:
def DefineLanguage(row):
    languageTag = None
    for language in progLangsList:
        if '<' + language.upper() + '>' in row._Tags.upper():
            languageTag = language
            break
    if languageTag is not None: 
        return (row._Id, languageTag, row._CreationDate.year, row._ViewCount)

def IsDateInRange(row):
    leftBorder = datetime(year=2010, month=1, day=1)
    rightBorder = datetime(year=2020, month=12, day=31)
    return row._CreationDate >= leftBorder and row._CreationDate <= rightBorder

In [89]:
topLanguagesPerYear = dfPostsSample.rdd.filter(lambda row: row._Tags is not None and IsDateInRange(row))\
    .map(DefineLanguage)\
    .filter(lambda row: row is not None)\
    .keyBy(lambda row: (row[2], row[1]))\
    .map(lambda row: ((row[0][0], row[0][1]), row[1][3]))\
    .reduceByKey(lambda a, b: a + b)\
    .map(lambda row: (row[0][0], row[0][1], row[1]))\
    .toDF(('Year', 'Language', 'Count'))

topLanguagesPerYear.show()

+----+------------+------+
|Year|    Language| Count|
+----+------------+------+
|2010|      Python| 59392|
|2010|  JavaScript|316131|
|2010|           R| 11087|
|2011| Objective-C|218762|
|2011|  JavaScript|806948|
|2013|  JavaScript|607937|
|2013|      Python|159360|
|2013|ActionScript|    30|
|2014|  JavaScript|505138|
|2014|      Python|238294|
|2014|         AWK| 12890|
|2016|         PHP| 98009|
|2016|  PowerShell| 36175|
|2016|        Curl|  9386|
|2016|       Scala| 11725|
|2019|  JavaScript| 13006|
|2019|          Go|   533|
|2019|        Curl|   150|
|2019|       Scala|   351|
|2015|      Python|278450|
+----+------------+------+
only showing top 20 rows



In [110]:
window_spec = Window.partitionBy("Year").orderBy(topLanguagesPerYear["Count"].desc())
topLanguagesRanks = topLanguagesPerYear.withColumn("rank", rank().over(window_spec))
result = topLanguagesRanks.filter(topLanguagesRanks["rank"] <= 10).drop(col("rank"))
result.show()

+----+-----------+-------+
|Year|   Language|  Count|
+----+-----------+-------+
|2010|        PHP|1184584|
|2010|       Java| 563211|
|2010| JavaScript| 316131|
|2010|Objective-C|  97009|
|2010|       Ruby|  76001|
|2010|          C|  66587|
|2010|     Python|  59392|
|2010|     MATLAB|  51865|
|2010|AppleScript|  32305|
|2010|     Delphi|  11817|
|2011| JavaScript| 806948|
|2011|       Java| 388524|
|2011|        PHP| 243646|
|2011|          C| 238277|
|2011|Objective-C| 218762|
|2011|     Python| 195016|
|2011|       Bash|  60805|
|2011|       Ruby|  33037|
|2011|       Perl|  24465|
|2011|     MATLAB|  18816|
+----+-----------+-------+
only showing top 20 rows



In [111]:
result.write.mode("overwrite").parquet("result")