In [1]:
import pandas as pd 
import json
import matplotlib.pyplot as plt 

#Graph network imports
from graphframes import *
from pyspark import *
from pyspark.sql import *
import numpy as np
from pyspark.ml.linalg import *
from pyspark.ml.linalg import *
from pyspark.sql.types import * 
from pyspark.sql.functions import *

from pyspark.sql.functions import udf #user defined function
from pyspark.sql.types import * #Import types == IntegerType, StringType etc.

import nltk

In [2]:
#import statements
from pyspark.sql import SparkSession

#create Spark session
spark = SparkSession.builder.enableHiveSupport().appName('Final_project_read_write').getOrCreate()

conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '15g'), ('spark.app.name', 'Final_project_read_write'), ('spark.executor.cores', '10'), ('spark.cores.max', '10'), ('spark.driver.memory','20g')])

#print spark configuration settings
#spark.sparkContext.getConf().getAll()

In [3]:
modeling_data = spark.read.parquet('modeling_data')

In [4]:
modeling_data.printSchema()

root
 |-- id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- inCitations_count: integer (nullable = true)
 |-- outCitations_count: integer (nullable = true)
 |-- abstract_wcount: integer (nullable = true)
 |-- title_wcount: integer (nullable = true)
 |-- abstract_tfidf: vector (nullable = true)
 |-- title_tfidf: vector (nullable = true)
 |-- SJR: string (nullable = true)
 |-- author_count: integer (nullable = true)
 |-- fieldsOfStudyVec: vector (nullable = true)
 |-- sourcesVec: vector (nullable = true)



In [4]:
modeling_data.count() - modeling_data.dropna().count()

192481

In [5]:
import pyspark.sql.functions as f
from functools import reduce

modeling_data.where(reduce(lambda x, y: x | y, (f.col(x).isNull() for x in modeling_data.columns))).show(5)

+--------------------+----+-----------------+------------------+---------------+------------+--------------+--------------------+---+------------+-----------------+-------------+
|                  id|year|inCitations_count|outCitations_count|abstract_wcount|title_wcount|abstract_tfidf|         title_tfidf|SJR|author_count| fieldsOfStudyVec|   sourcesVec|
+--------------------+----+-----------------+------------------+---------------+------------+--------------+--------------------+---+------------+-----------------+-------------+
|57d4efa8939189a64...|null|                0|                 0|              0|          12|(262144,[],[])|(262144,[2196,954...|124|           1|(2179,[12],[1.0])|(4,[0],[1.0])|
|5c1c3f390fd575dd1...|null|                0|                 0|              0|          12|(262144,[],[])|(262144,[13957,21...|124|           1| (2179,[9],[1.0])|(4,[0],[1.0])|
|c8351f487ab3c8caf...|null|                0|                 0|              0|           2|(262144,[],[

In [7]:
from pyspark.sql.functions import isnan, when, count, col

modeling_data.select([count(when(col(c).isNull(), c)).alias(c) for c in modeling_data.columns]).toPandas().T

Unnamed: 0,0
id,0
year,192481
inCitations_count,0
outCitations_count,0
abstract_wcount,0
title_wcount,0
abstract_tfidf,0
title_tfidf,0
SJR,0
author_count,0


In [6]:
year_stats = modeling_data.select('year').summary()

median_year = int(year_stats.collect()[5].year)
print('the median year is = ', median_year)

the median year is =  2008


In [7]:
#Impute median year
modeling_data = modeling_data.na.fill({'year': median_year})

#Convert to integer type column
modeling_data = modeling_data.withColumn("year", modeling_data["year"].cast(IntegerType()))
modeling_data = modeling_data.withColumn("SJR", modeling_data["SJR"].cast(IntegerType()))

In [8]:
from pyspark.ml.feature import VectorAssembler
#gather feature vector and identify features
assembler = VectorAssembler(inputCols = ['year', 'outCitations_count','abstract_wcount','title_wcount','abstract_tfidf',\
                                         'title_tfidf','SJR','author_count','fieldsOfStudyVec', 'sourcesVec'],
                            outputCol = 'features')


modeling_data = assembler.transform(modeling_data)

In [9]:
#split data into train and test
train_df, test_df = modeling_data.select('id','inCitations_count', 'features').randomSplit([0.8, 0.2], seed=42)
train_df.show(1)

Py4JJavaError: An error occurred while calling o173.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: 
Aborting TaskSet 5.0 because task 0 (partition 0)
cannot run anywhere due to node and executor blacklist.
Most recent failure:
Lost task 0.1 in stage 5.0 (TID 29, hd06.rcc.local, executor 15): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$4: (struct<year_double_VectorAssembler_b0d9646f5575:double,outCitations_count_double_VectorAssembler_b0d9646f5575:double,abstract_wcount_double_VectorAssembler_b0d9646f5575:double,title_wcount_double_VectorAssembler_b0d9646f5575:double,abstract_tfidf:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,title_tfidf:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,SJR_double_VectorAssembler_b0d9646f5575:double,author_count_double_VectorAssembler_b0d9646f5575:double,fieldsOfStudyVec:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>,sourcesVec:struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "keep". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:287)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:255)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:255)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:144)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$4.apply(VectorAssembler.scala:143)
	... 22 more


Blacklisting behavior can be configured via spark.blacklist.*.

	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2102)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2121)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [6]:
print('Train Length = ', train_df.count())
print('Test Length = ', test_df.count())

Train Length =  7986889
Test Length =  1994110


In [None]:
%%time
from pyspark.ml.regression import LinearRegression

#Elastic Net
lr = LinearRegression(featuresCol = 'features', labelCol='inCitations_count', regParam=0.3, maxIter=10)
lrm = lr.fit(train_df)

#coefficients
print("Coefficients: " + str(lrm.coefficients))
print("Intercept: " + str(lrm.intercept))

#model summary
print("RMSE: %f" % lrm.summary.rootMeanSquaredError)
print("r2: %f" % lrm.summary.r2)

#p-values are not provided in this model for the solver being used
#print("pValues: " + str(lrm.summary.pValues))

In [None]:
#make predictions
predictions = lrm.transform(test_df)


from itertools import chain
attrs = sorted(
    (attr["idx"], attr["name"]) for attr in (chain(*predictions
        .schema[lrm.summary.featuresCol]
        .metadata["ml_attr"]["attrs"].values())))

#[(name, lrm.summary.pValues[idx]) for idx, name in attrs]

from pyspark.ml.evaluation import RegressionEvaluator

eval = RegressionEvaluator(labelCol="inCitations_count", predictionCol="prediction", metricName="rmse")

# Root Mean Square Error
rmse = eval.evaluate(predictions)
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = eval.evaluate(predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)