In [1]:
import findspark
findspark.init()

import pyspark
import random

In [2]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F


def _init_spark():
    spark = SparkSession.builder.appName("2TaskSparkSQL").getOrCreate()
    sc = spark.sparkContext
    return spark, sc


def load_file(filename="text.txt", spark=None, sc=None):
    
    RDD = sc.textFile(filename)
    
    header = ['project_name', 'page_title', 'num_requests', 'content_size']
    in_df = RDD.map(lambda x: x.split(" "))
    df2 = in_df.toDF(header)
    
    DF = df2.withColumn("num_requests", df2["num_requests"].cast(LongType())).withColumn("content_size", df2["content_size"].cast(LongType()))
    DF.printSchema()
    print(DF.show(10))
    return DF

In [3]:
if __name__ == '__main__':
    spark, sc = _init_spark()
    DF = load_file("data/pagecounts-20080301-000000", spark, sc)
    

root
 |-- project_name: string (nullable = true)
 |-- page_title: string (nullable = true)
 |-- num_requests: long (nullable = true)
 |-- content_size: long (nullable = true)

+------------+--------------------+------------+------------+
|project_name|          page_title|num_requests|content_size|
+------------+--------------------+------------+------------+
|          aa|%27Ir%C2%B7r%C2%B...|           1|           1|
|          aa| Enqlizxsh_-_English|           1|           1|
|          aa|           Main_Page|           2|           2|
|          aa|Special:Recentcha...|           1|           1|
|          aa|  Special:Statistics|           1|           1|
|          aa|        User:JAnDbot|           1|           1|
|          aa|          User:Koavf|           1|           1|
|          aa|        User:Purbo_T|           1|           1|
|          aa|         User:SieBot|           1|           1|
|          aa|       User:Siebrand|           1|           1|
+------------+----

In [4]:
    """  
    SQL and DataSet API
    1. Total number of elements.
    2. Complete list of project names (no repetitions).
    3. Total content size of project “en” (Wikipedia in English).
    4. Top 5 most visited pages of project “en”, and the number of visits for each.
    """
    print(type(DF))
    print(1)
    print(DF.count())
    
    proj_names = [x['project_name'] for x in DF.select('project_name').distinct().collect()]
    print(2)
    print(len(proj_names))
    
    numb_en_proj = DF.filter(DF["project_name"] == "en").groupBy().sum().collect()
    print(3)
    print(numb_en_proj[0][1])
    
    df3 = DF.filter(DF["project_name"] == "en")
    df4 = df3.sort("num_requests", ascending=False).take(5)
    top_5 = [(line["page_title"], line["num_requests"]) for line in df4]
    print(4)
    print(top_5)

<class 'pyspark.sql.dataframe.DataFrame'>
1
2896526
2
267
3
6862122
4
[('Special:Search', 648915), ('Main_Page', 197237), ('Special:Random', 71862), ('Leap_year', 11936), ('Ricin', 7531)]


In [5]:
DF.createOrReplaceTempView("Wikipedia")

In [6]:
sql1 = spark.sql("SELECT count(*) FROM Wikipedia")

In [7]:
sql1.first()[0]

2896526

In [8]:
sql2 = spark.sql("SELECT DISTINCT project_name FROM Wikipedia")

In [9]:
sql2.count()

267

In [10]:
sql3 = spark.sql("SELECT SUM(content_size) FROM Wikipedia WHERE project_name == 'en' ")

In [11]:
sql3.first()[0]

6862122

In [12]:
sql4 = spark.sql("SELECT page_title, num_requests FROM Wikipedia WHERE project_name =='en' ORDER BY num_requests DESC LIMIT 10")

In [13]:
sql4.show()

+--------------------+------------+
|          page_title|num_requests|
+--------------------+------------+
|      Special:Search|      648915|
|           Main_Page|      197237|
|      Special:Random|       71862|
|           Leap_year|       11936|
|               Ricin|        7531|
| Canine_reproduction|        7115|
|   Special:Watchlist|        6818|
|                Wiki|        5614|
|The_Rock_(enterta...|        3080|
|        Barack_Obama|        2868|
+--------------------+------------+



In [14]:
sc.stop()