In [2]:
import pyspark
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import lit
from pyspark.sql.functions import *
from subprocess import call

ModuleNotFoundError: No module named 'pyspark'

In [2]:
spark = (
    SparkSession.builder
                .appName("Stack Overflow Data Wrangling")
                .config("spark.jars", "postgresql-42.2.8.jar") 
                .getOrCreate()
)


In [3]:
questions = spark.read.csv(
    "questions.csv",
    header=True, inferSchema=True, multiLine=True, escape = '"'
    ) 

In [19]:
questions.printSchema()

root
 |-- id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- body: string (nullable = true)
 |-- accepted_answer_id: integer (nullable = true)
 |-- score: integer (nullable = true)
 |-- view_count: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- created_at: timestamp (nullable = true)



In [20]:
questions = questions.withColumnRenamed('id', 'questions_id') 
questions = questions.withColumnRenamed('user_id', 'question_user_id')
questions = questions.withColumnRenamed('created_at', 'questions_created_at')
questions.show(5)

+------------+----------------+--------------------+--------------------+------------------+-----+----------+-------------+--------------------+
|questions_id|question_user_id|               title|                body|accepted_answer_id|score|view_count|comment_count|questions_created_at|
+------------+----------------+--------------------+--------------------+------------------+-----+----------+-------------+--------------------+
|    54233315|         1118630|XPath parent node...|<p>I'm trying to ...|          54233368|    1|       134|            4| 2019-01-17 09:59:47|
|    54233145|         7984274| Is this a java BUG?|<p>why the follow...|          54234312|   -2|       132|            3| 2019-01-17 09:50:12|
|    54233331|         1877002|Different results...|<p>I am new to li...|          54233375|   -1|        26|            0| 2019-01-17 10:00:17|
|    54233149|        10927076|Using eval as pro...|<p>I know there a...|          54233257|    1|        49|            2| 2019-0

In [21]:
answers = spark.read.csv(
      "answers.csv",
    header=True, inferSchema=True,multiLine=True, escape = '"'
    )

In [22]:
answers = answers.withColumnRenamed('id', 'answer_id') 
answers = answers.withColumnRenamed('user_id', 'answer_user_id')
answers.show(5)

+---------+--------------+-----------+--------------------+-----+-------------+-------------------+
|answer_id|answer_user_id|question_id|                body|score|comment_count|         created_at|
+---------+--------------+-----------+--------------------+-----+-------------+-------------------+
| 53999517|       1771994|   53999275|<p>The <code>for....|    0|            0|2019-01-01 22:42:02|
| 54005064|        948762|   54004882|<p>This is becaus...|    0|            0|2019-01-02 10:55:21|
| 53995281|       5159168|   53995029|<p>The simplest m...|    0|            0|2019-01-01 12:04:45|
| 54000208|       7964527|   54000128|<p>Using <code>ro...|    0|            0|2019-01-02 01:12:19|
| 54005110|       9653876|   54003879|<p>Check your <st...|    0|            0|2019-01-02 10:58:13|
+---------+--------------+-----------+--------------------+-----+-------------+-------------------+
only showing top 5 rows



In [23]:
users = spark.read.csv(
    "users.csv",
    header=True, inferSchema=True,multiLine=True, escape = '"'
    )

In [24]:
users = users.withColumnRenamed('id', 'user_id')
# users.show(5)

In [25]:
users.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- display_name: string (nullable = true)
 |-- reputation: integer (nullable = true)
 |-- website_url: string (nullable = true)
 |-- location: string (nullable = true)
 |-- about_me: string (nullable = true)
 |-- views: string (nullable = true)
 |-- up_votes: integer (nullable = true)
 |-- down_votes: integer (nullable = true)
 |-- image_url: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- updated_at: string (nullable = true)
 |-- _c12: timestamp (nullable = true)
 |-- _c13: timestamp (nullable = true)



# TASKS

# Select users from only one country of your choosing.

In [26]:

users.registerTempTable('users')
userCountry = spark.sql("SELECT * FROM users WHERE location = 'India'").show(5)

+--------+----------------+----------+--------------------+--------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+----+----+
| user_id|    display_name|reputation|         website_url|location|            about_me|views|up_votes|down_votes|           image_url|         created_at|         updated_at|_c12|_c13|
+--------+----------------+----------+--------------------+--------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+----+----+
|10260337|Pratik K. Tiwari|         1|http://pratiktiwa...|   India|<p>Hi, I am first...|    2|       0|         0|                null|2018-08-22 14:40:05|2019-04-25 08:34:33|null|null|
| 8387608|           Abhay|         1|                null|   India|                null|    0|       0|         0|https://www.grava...|2017-07-29 22:20:14|2019-09-06 16:03:34|null|null|
| 4712224|    DevallaVamsi|         1|                null|   Ind

In [27]:
# users = users.withColumn('location', regexp_replace('location', 'USA', 'United States'))


In [28]:
indiaTb = users.filter(users.location.contains('India'))

In [29]:
indiaTb = indiaTb.withColumn('location', lower(col('location')))

In [30]:
indiaTb = indiaTb.withColumn('location', regexp_replace('location', r"[,]\s*\w*\s*[,]", ','))
indiaTb.show(10)

+--------+------------------+----------+--------------------+--------------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+----+----+
| user_id|      display_name|reputation|         website_url|            location|            about_me|views|up_votes|down_votes|           image_url|         created_at|         updated_at|_c12|_c13|
+--------+------------------+----------+--------------------+--------------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+----+----+
| 8357266|            suryan|         7|https://twitter.c...|    bangalore, india|                null|    8|       0|         0|https://www.grava...|2017-07-24 10:55:23|2019-06-19 05:00:16|null|null|
| 6504306|             A.Raw|         4|                null|    new delhi, india|                null|   10|       0|         0|https://i.stack.i...|2016-06-23 12:58:03|2019-10-12 06:59:32|null|n

# Extract the country and city into new columns

In [31]:


indialoc = indiaTb.withColumn('location', split(indiaTb.location, ',')) \
  .select('user_id', 'display_name', 'views', 'reputation', 'updated_at', 'location', 'created_at', element_at(col('location'),-2).alias('city'), element_at(col('location'), -1).alias('country'))

indialoc.show(5)



+--------+-------------+-----+----------+-------------------+-------------------+-------------------+---------+-------+
| user_id| display_name|views|reputation|         updated_at|           location|         created_at|     city|country|
+--------+-------------+-----+----------+-------------------+-------------------+-------------------+---------+-------+
| 8357266|       suryan|    8|         7|2019-06-19 05:00:16|[bangalore,  india]|2017-07-24 10:55:23|bangalore|  india|
| 6504306|        A.Raw|   10|         4|2019-10-12 06:59:32|[new delhi,  india]|2016-06-23 12:58:03|new delhi|  india|
|10260743|Kartik Juneja|    2|         3|2019-01-27 13:05:45|[gharaunda,  india]|2018-08-22 16:14:32|gharaunda|  india|
| 4689205|       sd5869|    5|         1|2019-09-18 14:36:03|[new delhi,  india]|2015-03-19 10:20:21|new delhi|  india|
|10262756|      Ali Mir|    9|         5|2019-10-12 06:38:20|[jalandhar,  india]|2018-08-23 04:10:49|jalandhar|  india|
+--------+-------------+-----+----------

# Join this with the questions and only pick questions with at least 20 view_counts.


In [32]:
quesIndia = indialoc.join(questions, indialoc.user_id == questions.question_user_id)
quesIndia.show(5)


+-------+------------+-----+----------+-------------------+--------------------+-------------------+--------------+-------+------------+----------------+--------------------+--------------------+------------------+-----+----------+-------------+--------------------+
|user_id|display_name|views|reputation|         updated_at|            location|         created_at|          city|country|questions_id|question_user_id|               title|                body|accepted_answer_id|score|view_count|comment_count|questions_created_at|
+-------+------------+-----+----------+-------------------+--------------------+-------------------+--------------+-------+------------+----------------+--------------------+--------------------+------------------+-----+----------+-------------+--------------------+
| 348851|     defiant|  257|      1063|2019-10-12 23:19:37| [bangalore,  india]|2010-05-24 10:36:13|     bangalore|  india|    55115359|          348851|Is there anyway I...|<p>I am using Rma...|    

In [33]:
quesIndia.filter(quesIndia.view_count >= 20).show(5)

+-------+------------+-----+----------+-------------------+--------------------+-------------------+--------------+-------+------------+----------------+--------------------+--------------------+------------------+-----+----------+-------------+--------------------+
|user_id|display_name|views|reputation|         updated_at|            location|         created_at|          city|country|questions_id|question_user_id|               title|                body|accepted_answer_id|score|view_count|comment_count|questions_created_at|
+-------+------------+-----+----------+-------------------+--------------------+-------------------+--------------+-------+------------+----------------+--------------------+--------------------+------------------+-----+----------+-------------+--------------------+
| 348851|     defiant|  257|      1063|2019-10-12 23:19:37| [bangalore,  india]|2010-05-24 10:36:13|     bangalore|  india|    55115359|          348851|Is there anyway I...|<p>I am using Rma...|    

In [34]:
# #Join this with the questions and only pick questions with at least 20 view_counts.
# indialoc.registerTempTable('indialoc')
# questions.registerTempTable('questions')

# quesIndia = spark.sql("SELECT * FROM indialoc LEFT JOIN questions ON (indialoc.user_id = questions.question_user_id) WHERE questions.view_count >= 20")

In [35]:
# quesIndia.filter(quesIndia.display_name.isNotNull()).show(5)

In [36]:
answers = answers.withColumnRenamed('body', 'answers_body') 
answers = answers.withColumnRenamed('score', 'answers_score') 
answers = answers.withColumnRenamed('comment_count', 'answers_comment_count') 
answers = answers.withColumnRenamed('created_at', 'answers_created_at') 

# Join the answers to the results of (3)

In [37]:
ansRes = quesIndia.join(answers, answers.question_id == quesIndia.questions_id, how='left')
ansRes.show(5)

+-------+--------------+-----+----------+-------------------+------------------+-------------------+--------+-------+------------+----------------+--------------------+--------------------+------------------+-----+----------+-------------+--------------------+---------+--------------+-----------+--------------------+-------------+---------------------+-------------------+
|user_id|  display_name|views|reputation|         updated_at|          location|         created_at|    city|country|questions_id|question_user_id|               title|                body|accepted_answer_id|score|view_count|comment_count|questions_created_at|answer_id|answer_user_id|question_id|        answers_body|answers_score|answers_comment_count| answers_created_at|
+-------+--------------+-----+----------+-------------------+------------------+-------------------+--------+-------+------------+----------------+--------------------+--------------------+------------------+-----+----------+-------------+-----------

In [38]:
ansRes.count()

39763

In [46]:
from pyspark.sql.types import IntegerType
ansRes = ansRes.withColumn("views", ansRes["views"].cast(IntegerType()))

In [47]:
ansRes.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- display_name: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- reputation: integer (nullable = true)
 |-- updated_at: string (nullable = true)
 |-- location: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- created_at: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- questions_id: integer (nullable = true)
 |-- question_user_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- body: string (nullable = true)
 |-- accepted_answer_id: integer (nullable = true)
 |-- score: integer (nullable = true)
 |-- view_count: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- questions_created_at: timestamp (nullable = true)
 |-- answer_id: integer (nullable = true)
 |-- answer_user_id: integer (nullable = true)
 |-- question_id: integer (nullable = true)
 |-- answers_body: string (nullable = true)
 |-- an

In [48]:
# quesIndia.registerTempTable('quesIndia')
# answers.registerTempTable('answers')

# ansRes = spark.sql("SELECT * FROM quesIndia LEFT JOIN answers ON (answers.question_id = quesIndia.questions_id)")

In [49]:
# ansRes.show(5)

# Use this to return the minimum updated_at time.

In [50]:
min_ansRes = ansRes.select([min('updated_at')])
min_ansRes.show()

+-------------------+
|    min(updated_at)|
+-------------------+
|2019-01-02 04:42:48|
+-------------------+



# Use spark to write the results into this table with the snippet below.


In [54]:

ansRes.write.format("jdbc").options(
    url='jdbc:postgresql://localhost/postgres',
    driver='org.postgresql.Driver',
    user='postgres',
    password='postgres1234',
    dbtable='stackoverflow_filtered.results'
).save(mode='append')


In [55]:
ansRes.columns


['user_id',
 'display_name',
 'views',
 'reputation',
 'updated_at',
 'location',
 'created_at',
 'city',
 'country',
 'questions_id',
 'question_user_id',
 'title',
 'body',
 'accepted_answer_id',
 'score',
 'view_count',
 'comment_count',
 'questions_created_at',
 'answer_id',
 'answer_user_id',
 'question_id',
 'answers_body',
 'answers_score',
 'answers_comment_count',
 'answers_created_at']

# In your Jupyter notebook, state the difference between views and materialized views

The difference between a view and a materialised view is that a view serves as a virtual table with the query passed where as the materialised view serves as a physical store table for the query passed.