In [1]:
import pyspark
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import lit
from pyspark.sql.functions import *
from subprocess import call

In [2]:
spark = (
    SparkSession.builder
                .appName("Stack Overflow Data Wrangling")
                .config("spark.jars", "postgresql-42.2.8.jar") 
                .getOrCreate()
)


In [3]:
questions = spark.read.csv(
    "questions.csv",
    header=True, inferSchema=True
    ) 

In [4]:
questions = questions.withColumnRenamed('id', 'questions_id') 
questions = questions.withColumnRenamed('user_id', 'question_user_id')
questions = questions.withColumnRenamed('created_at', 'questions_created_at')
questions.show(5)

+--------------------+----------------+--------------------+--------------------+------------------+-----+----------+-------------+--------------------+
|        questions_id|question_user_id|               title|                body|accepted_answer_id|score|view_count|comment_count|questions_created_at|
+--------------------+----------------+--------------------+--------------------+------------------+-----+----------+-------------+--------------------+
|            54233315|         1118630|XPath parent node...|<p>I'm trying to ...|              null| null|      null|         null|                null|
|<p>So in Python I...|            null|                null|                null|              null| null|      null|         null|                null|
|<pre><code>for ph...|            null|                null|                null|              null| null|      null|         null|                null|
|       </code></pre>|            null|                null|                null| 

In [5]:
answers = spark.read.csv(
      "answers.csv",
    header=True, inferSchema=True
    )

In [26]:
answers = answers.withColumnRenamed('id', 'answer_id') 
answers = answers.withColumnRenamed('user_id', 'answer_user_id')
answers.show(5)

+--------------------+--------------+-----------+--------------------+-------------+---------------------+------------------+
|           answer_id|answer_user_id|question_id|        answers_body|answers_score|answers_comment_count|answers_created_at|
+--------------------+--------------+-----------+--------------------+-------------+---------------------+------------------+
|            53999517|       1771994|   53999275|<p>The <code>for....|         null|                 null|              null|
|<p><div class=""s...|          null|       null|                null|         null|                 null|              null|
|<div class=""snip...|          null|       null|                null|         null|                 null|              null|
|<pre class=""snip...|          null|       null|                null|         null|                 null|              null|
|  const quotes = ...|          null|       null|                null|         null|                 null|            

In [27]:
users = spark.read.csv(
    "users.csv",
    header=True, inferSchema=True
    )

In [28]:
users = users.withColumnRenamed('id', 'user_id')
# users.show(5)

# TASKS

# Select users from only one country of your choosing.

In [29]:

users.registerTempTable('users')
userCountry = spark.sql("SELECT * FROM users WHERE location = 'India'").show(5)

+--------+----------------+----------+--------------------+--------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+----+----+
| user_id|    display_name|reputation|         website_url|location|            about_me|views|up_votes|down_votes|           image_url|         created_at|         updated_at|_c12|_c13|
+--------+----------------+----------+--------------------+--------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+----+----+
|10260337|Pratik K. Tiwari|         1|http://pratiktiwa...|   India|<p>Hi, I am first...| null|    null|      null|                null|               null|               null|null|null|
| 8387608|           Abhay|         1|                null|   India|                null|    0|       0|         0|https://www.grava...|2017-07-29 22:20:14|2019-09-06 16:03:34|null|null|
| 4712224|    DevallaVamsi|         1|                null|   Ind

In [30]:
# users = users.withColumn('location', regexp_replace('location', 'USA', 'United States'))


In [31]:
indiaTb = users.filter(users.location.contains('India'))

In [32]:
indiaTb = indiaTb.withColumn('location', lower(col('location')))

In [33]:
indiaTb = indiaTb.withColumn('location', regexp_replace('location', r"[,]\s*\w*\s*[,]", ','))
indiaTb.show(10)

+--------+------------------+----------+--------------------+--------------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+----+----+
| user_id|      display_name|reputation|         website_url|            location|            about_me|views|up_votes|down_votes|           image_url|         created_at|         updated_at|_c12|_c13|
+--------+------------------+----------+--------------------+--------------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+----+----+
| 8357266|            suryan|         7|https://twitter.c...|    bangalore, india|                null|    8|       0|         0|https://www.grava...|2017-07-24 10:55:23|2019-06-19 05:00:16|null|null|
| 6504306|             A.Raw|         4|                null|    new delhi, india|                null|   10|       0|         0|https://i.stack.i...|2016-06-23 12:58:03|2019-10-12 06:59:32|null|n

# Extract the country and city into new columns

In [34]:


indialoc = indiaTb.withColumn('location', split(indiaTb.location, ',')) \
  .select('user_id', 'display_name', 'views', 'reputation', 'updated_at', 'location', 'created_at', element_at(col('location'),-2).alias('city'), element_at(col('location'), -1).alias('country'))

indialoc.show(5)



+--------+-------------+-----+----------+-------------------+-------------------+-------------------+---------+-------+
| user_id| display_name|views|reputation|         updated_at|           location|         created_at|     city|country|
+--------+-------------+-----+----------+-------------------+-------------------+-------------------+---------+-------+
| 8357266|       suryan|    8|         7|2019-06-19 05:00:16|[bangalore,  india]|2017-07-24 10:55:23|bangalore|  india|
| 6504306|        A.Raw|   10|         4|2019-10-12 06:59:32|[new delhi,  india]|2016-06-23 12:58:03|new delhi|  india|
|10260743|Kartik Juneja| null|         3|               null|[gharaunda,  india]|               null|gharaunda|  india|
| 4689205|       sd5869|    5|         1|2019-09-18 14:36:03|[new delhi,  india]|2015-03-19 10:20:21|new delhi|  india|
|10262756|      Ali Mir| null|         5|               null|[jalandhar,  india]|               null|jalandhar|  india|
+--------+-------------+-----+----------

# Join this with the questions and only pick questions with at least 20 view_counts.


In [35]:
#Join this with the questions and only pick questions with at least 20 view_counts.
indialoc.registerTempTable('indialoc')
questions.registerTempTable('questions')

quesIndia = spark.sql("SELECT * FROM indialoc LEFT JOIN questions ON (indialoc.user_id = questions.question_user_id) WHERE questions.view_count >= 20")

In [36]:
quesIndia.show(5)

+-------+------------+-----+----------+-------------------+-------------------+-------------------+---------+-------+------------+----------------+-----+----+------------------+-------+----------+-------------+--------------------+
|user_id|display_name|views|reputation|         updated_at|           location|         created_at|     city|country|questions_id|question_user_id|title|body|accepted_answer_id|  score|view_count|comment_count|questions_created_at|
+-------+------------+-----+----------+-------------------+-------------------+-------------------+---------+-------+------------+----------------+-----+----+------------------+-------+----------+-------------+--------------------+
|3273751| user3273751|    5|        16|2019-07-09 07:35:59|[bangalore,  india]|2014-02-05 05:16:17|bangalore|  india|           R|         3273751|46609|6452|              3447|1810631|     45933|         6382|                3447|
+-------+------------+-----+----------+-------------------+-------------

In [37]:
# quesIndia.filter(quesIndia.display_name.isNotNull()).show(5)

In [38]:
answers = answers.withColumnRenamed('body', 'answers_body') 
answers = answers.withColumnRenamed('score', 'answers_score') 
answers = answers.withColumnRenamed('comment_count', 'answers_comment_count') 
answers = answers.withColumnRenamed('created_at', 'answers_created_at') 

# Join the answers to the results of (3)

In [39]:
quesIndia.registerTempTable('quesIndia')
answers.registerTempTable('answers')

ansRes = spark.sql("SELECT * FROM quesIndia LEFT JOIN answers ON (answers.question_id = quesIndia.questions_id)")

In [40]:
# ansRes.show(5)

# Use this to return the minimum updated_at time.

In [41]:
min_ansRes = ansRes.select([min('updated_at')])
min_ansRes.show()

+-------------------+
|    min(updated_at)|
+-------------------+
|2019-07-09 07:35:59|
+-------------------+



# Use spark to write the results into this table with the snippet below.


In [42]:

ansRes.write.format("jdbc").options(
    url='jdbc:postgresql://localhost/postgres',
    driver='org.postgresql.Driver',
    user='postgres',
    password='postgres1234',
    dbtable='stackoverflow_filtered.results'
).save(mode='append')


Py4JJavaError: An error occurred while calling o299.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 129 in stage 66.0 failed 1 times, most recent failure: Lost task 129.0 in stage 66.0 (TID 1568, localhost, executor driver): java.sql.BatchUpdateException: Batch entry 0 INSERT INTO stackoverflow_filtered.results ("user_id","display_name","views","reputation","updated_at","location","created_at","city","country","questions_id","question_user_id","title","body","accepted_answer_id","score","view_count","comment_count","questions_created_at","answer_id","answer_user_id","question_id","answers_body","answers_score","answers_comment_count","answers_created_at") VALUES ('3273751','user3273751','5','16','2019-07-09 07:35:59','{"bangalore"," india"}','2014-02-05 05:16:17','bangalore',' india','R','3273751','46609','6452','3447','1810631','45933','6382','3447','<p>Just for clarification; <code>P','p','R','r','S','s</code> are called identifiers',' and you need <code>float</code> value to assign them with(since they are of type<code>float</code>).</p>') was aborted: ERROR: column "user_id" is of type integer but expression is of type character varying
  Hint: You will need to rewrite or cast the expression.
  Position: 403  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:837)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1546)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:672)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.postgresql.util.PSQLException: ERROR: column "user_id" is of type integer but expression is of type character varying
  Hint: You will need to rewrite or cast the expression.
  Position: 403
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2497)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2233)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:310)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:834)
	... 16 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:933)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:933)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:834)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:68)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO stackoverflow_filtered.results ("user_id","display_name","views","reputation","updated_at","location","created_at","city","country","questions_id","question_user_id","title","body","accepted_answer_id","score","view_count","comment_count","questions_created_at","answer_id","answer_user_id","question_id","answers_body","answers_score","answers_comment_count","answers_created_at") VALUES ('3273751','user3273751','5','16','2019-07-09 07:35:59','{"bangalore"," india"}','2014-02-05 05:16:17','bangalore',' india','R','3273751','46609','6452','3447','1810631','45933','6382','3447','<p>Just for clarification; <code>P','p','R','r','S','s</code> are called identifiers',' and you need <code>float</code> value to assign them with(since they are of type<code>float</code>).</p>') was aborted: ERROR: column "user_id" is of type integer but expression is of type character varying
  Hint: You will need to rewrite or cast the expression.
  Position: 403  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:837)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1546)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:672)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
Caused by: org.postgresql.util.PSQLException: ERROR: column "user_id" is of type integer but expression is of type character varying
  Hint: You will need to rewrite or cast the expression.
  Position: 403
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2497)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2233)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:310)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:834)
	... 16 more


In [None]:
ansRes.columns


# In your Jupyter notebook, state the difference between views and materialized views

The difference between a view and a materialised view is that a view serves as a virtual table with the query passed where as the materialised view serves as a physical store table for the query passed.

# If you were to give an award to one user, who will it be? And why?


I would give it to user because he is the only user in my table.