In [25]:
#import necessary modules
from pyspark import *
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [26]:
#Set environment variables for py4jjava error
import os
import sys
os.environ['PYSPARK_PYTHON']= sys.executable
os.environ['PYSPARK_DRIVER_PYTHON']= sys.executable
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = "notebook"

In [27]:
# Create a SparkSession object
spark= SparkSession.builder.appName("Amazon_get_recomm").getOrCreate()

In [28]:
spark.conf.set("spark.sql.shuffle.partitions", "10")
spark.conf.set("spark.sql.execution.arrow.timeout", "300")
spark.conf.set("spark.sql.analyzer.failAmbiguousSelfJoin", "false")


In [29]:
# Read in the data from a .txt file and create a DataFrame
metadf = spark.read.options().text(r"C:\Users\vrushalideshmukh\Documents\BFL_Internship_Docs\amazon-meta.txt")

In [30]:
# Create an empty list called 'listing'
listing = []
#Open the file at the specified path and encode it using utf-8
# fileopen = open(r"C:\Users\vrushalideshmukh\Documents\BFL_Internship_Docs\amazon-meta.txt",encoding='utf-8')
fileopen = open(r"C:\Users\vrushalideshmukh\Documents\BFL_Internship_Docs\amazon-meta.txt",encoding='utf-8')
#Start an infinite loop until there are no more lines to read in the file
while True:
    line = fileopen.readline()
    if not line:
        break
    else: listing.append(line)

In [31]:
import re
# create an empty list
dict=[]
# initialize flag to 0
flag = 0
# initialize newstr variable to empty string
newstr = ""

# loop through each line in the listing
for i in listing:
    # check if the line contains "Id"
    if "Id" in i:
        # check if flag is 0
        if flag == 0:
            newstr= "" + i
            flag = 1
            continue
        # check if flag is 1
        elif flag == 1:
            # remove trailing newline character
            if newstr.endswith('\n'):
                newstr = newstr[:-1]
            dict.append([newstr])
            newstr=""
    newstr += i

In [32]:
newdict=[]
newdict= dict
# create a spark dataframe from first 50 tuples
metadf = spark.createDataFrame(dict[:1000],['value'])

In [33]:
# import pyspark.sql.functions module and alias it as F
import pyspark.sql.functions as F

# Define regular expressions as string literals
product_id_regex = '^Id:\s*(\d+)'
product_ASIN_regex = '.*(?:ASIN:\s*)(\d+|\w+)'
product_title_regex = 'title:\s*(.*)'
product_group_regex = '.*(?:group:\s*)([\w]*)'
sales_rank_regex = '.*(?:salesrank:\s*)(\d*)'
similar_products_regex = '.*(?:similar:\s*[1-9]\s*)([\w ?]*)'
categories_regex = 'categories:\s*[1-9]\s*([\s\S]*?)(?=reviews|\Z)'
# reviews_regex = '.*(?:reviews:\\s*)(\\d+),(\\d+),([\\d.]+),(.*)'
total_reviews_regex = '.*(?:reviews:\s*)total:\s*(\d+)'
avg_rating_regex = '.*avg rating:\s*([\d.]+)'
# custdata_regex = '\n*\r*\s*(\d*-\d*-\d*)\s*cutomer:\s*([A-Za-z0-9]*)\s*rating:\s*(\d*)\s*votes:\s*(\d*)\s*helpful:\s*(\d*)\r*\n*'

# list of metadata field names
metadata_fields = ['id', 'asins', 'titles', 'groups', 'salesranks', 'categories', 'similars','total_reviews', 'avg_rating']

# loop through each regular expression and field name
for i, regex in enumerate([product_id_regex, product_ASIN_regex, product_title_regex, product_group_regex, sales_rank_regex, categories_regex, similar_products_regex, total_reviews_regex, avg_rating_regex]):
    # get the corresponding field name from the metadata_fields list
    field_name = metadata_fields[i]
    # apply regular expression and create a new column with the field name
    metadf = metadf.withColumn(field_name, F.regexp_extract('value', regex, 1))

In [34]:
# Split the 'similars' column by whitespace and convert to list
split_col = F.split(metadf['similars'], '\\s+')

# Define a user-defined function (UDF) to convert the split column to a list
# This lambda function simply takes a single input argument x and returns it unchanged.
# The purpose of the UDF is to apply a transformation to the input data, and the transformation is defined by the lambda function.
# The UDF is being used to convert the split column (which is a column of strings) to a list. 
# The lambda function simply returns the input string unchanged, so the UDF applies no transformation to the data. 
# However, because we have specified that the return type of the UDF is ArrayType(StringType())
# Hence, the resulting output column will be a column of lists of strings.
to_list_udf = F.udf(lambda x: x, ArrayType(StringType()))

# Apply the UDF to the split column and create a new column with the resulting list
metadf = metadf.withColumn('similars_list', to_list_udf(split_col))

#The resulting 'similars_list' column will contain a list of strings that were previously separated by whitespace in the 'similars' column.
# Drop the original 'similars' column since we no longer need it
metadf=metadf.drop('similars')

# Split categories by '\n' and create new columns to separate categories
metadf = metadf.withColumn('categories_list', F.split(metadf['categories'], '\n'))
metadf = metadf.withColumn('category_1', metadf['categories_list'][0])
metadf = metadf.withColumn('category_2', metadf['categories_list'][1])
metadf = metadf.withColumn('category_3', metadf['categories_list'][2])
metadf = metadf.withColumn('category_4', metadf['categories_list'][3])
metadf = metadf.withColumn('category_5', metadf['categories_list'][4])

# Use a conditional statement to check if there is a 6th category, and create a new column for it if there is
metadf = metadf.withColumn('category_6', F.when(F.size(metadf['categories_list']) > 1, metadf['categories_list'][5]).otherwise(None))

# Drop the 'categories_list' column since we no longer need it
# metadf = metadf.drop('categories_list')

# Drop the original 'categories' column since we've extracted the relevant information into new columns
metadf=metadf.drop('categories')

# Drop the 'value' column since it was only used temporarily to extract the metadata fields

metadf=metadf.drop('value')

In [35]:
# Using select casting the string columns as the required data type

metadf=  metadf.withColumn("ID",col("id").cast("int"))
metadf = metadf.withColumn("salesranks", col("salesranks").cast("double"))
metadf = metadf.withColumn("avg_rating", col("avg_rating").cast("float"))
metadf = metadf.withColumn("total_reviews", col("total_reviews").cast("int"))
metadf.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- asins: string (nullable = true)
 |-- titles: string (nullable = true)
 |-- groups: string (nullable = true)
 |-- salesranks: double (nullable = true)
 |-- total_reviews: integer (nullable = true)
 |-- avg_rating: float (nullable = true)
 |-- similars_list: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- categories_list: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- category_1: string (nullable = true)
 |-- category_2: string (nullable = true)
 |-- category_3: string (nullable = true)
 |-- category_4: string (nullable = true)
 |-- category_5: string (nullable = true)
 |-- category_6: string (nullable = true)



I am trying to make a getRecommendation function for the amazon snap dataset.
Use the getid(id) function and metadf to make the following functions:
1. Make getContext(id) function. 
The function should return three dataframes 
first dataframe of id, asin, title, group, similars_list and categories_list 
second dataframe of id, asin, title, group, similars_list ( similars_list  exploded into multiple rows)
third dataframe of id, asin, title, group, categories_list (categories_list exploded into multiple rows)

2. Make getSimilars(id) function.
The function should call the second dataframe containing the similars_list from getContext(id) function.
The function should then match the asins in the similars_list with asins in the metadf and return a dataframe containing id, asin, title, group, categories_list  (categories_list exploded into multiple rows) 



3. Make a getRelevantCategory function as getRelevantCategory(group, top n, score) that gives categories as output 
It is further divided into two functions: in_category function and out_category function
The getRelevantCategory function first calls the getContext(id) function.
The getContext(id) function fetches the Asin, title, group, similars_list and categories_list corresponding to the input id.
for in_category function:


Make a getRecommendation(id, group, categories, top_n) function
1. call getContext(id), getSimilars(id), getRelevantCategories(group) functions inside the getRecommendation function.
2. Input condition:
Id cannot be null
3. if the input categories is Not Null, match and filter the input categories with that obtained from the getRelevantCategories 
4. display the top_n asin and title as the top recommendations


For out group recommendations, 
A.similardf-> filter those ids with similar categories and group 
B. also, match the title obtained from getcontext with others in the same column using n-gram model
C. find out the score as per helpful_percentage *  avg_rating * popularity_percentage. (popularity_percentage= salesrank*100/salesrankmax- scoring on the basis of distribution) 
higher the score, better the visibility. 

In [12]:
# #Returns in categories
# def getContext(id):
#     # get the row matching the input ID
#     row = metadf.filter(col('ID') == id).select('ID', 'asins', 'titles', 'groups', 'similars_list', 'categories_list').first()
    
#     # create first dataframe
#     context_df = spark.createDataFrame([row], ['ID', 'asins', 'titles', 'groups', 'similars_list', 'categories_list'])
    
#     # create second dataframe with similars_list exploded
#     similars_context_df = metadf.filter(col('ID') == id).select('ID', 'asins', 'titles', 'groups', explode('similars_list').alias('similar_asins'))
    
#     # create third dataframe with categories_list exploded
#     categories_context_df = metadf.filter(col('ID') == id).select('ID', 'asins', 'titles', 'groups', explode('categories_list').alias('category'))
    
#     return context_df, similars_context_df, categories_context_df
    

In [13]:
# #Returns out categories
# def getSimilars(id):
#     # get the row matching the input ID
#     row = metadf.filter(col('ID') == id).select('ID', 'asins', 'titles', 'groups', 'similars_list', 'categories_list').first()
    
#     # create a dataframe with similars_list exploded
#     similars_context_df = metadf.filter(col('ID') == id).select('ID', 'asins', 'titles', 'groups', explode('similars_list').alias('similar_asins'))
    
#     # match the asins in the similars_list with asins in the metadf
#     # matches = metadf.join(similars_context_df, metadf.asins == similars_context_df.similar_asins, 'inner')
#     matches = metadf.alias("m").join(similars_context_df.alias("s"), col("m.asins") == col("s.similar_asins"), "inner")

    
#     # select the desired columns and explode the categories_list
#     # out_categories_context_df = matches.select('ID', 'asins', 'titles', 'groups', explode('categories_list').alias('category'))
#     out_categories_context_df = matches.select(metadf.ID, metadf.asins, metadf.titles, metadf.groups, explode('categories_list').alias('category'))

    
#     return out_categories_context_df

In [14]:
# def getRelevantCategory(group):
    
#     if group is None:
#         # use categories_context_df obtained from getContext(id)
#         relevant_category_df = categories_context_df
#     else:
#         # use out_categories_context_df obtained from getSimilars(id)
#         out_categories_context_df = getSimilars(id)[3]
#         relevant_category_df = out_categories_context_df.filter(col('group') == group)

#     # calculate the maximum values for salesrank and total_reviews
#     max_salesrank = relevant_category_df.agg({"salesrank": "max"}).collect()[0][0]
#     max_total_reviews = relevant_category_df.agg({"total_reviews": "max"}).collect()[0][0]

#     # calculate the score using min-max normalization
#     relevant_category_df = relevant_category_df.withColumn("score", \
#                 (col("salesrank") / max_salesrank + col("total_reviews") / max_total_reviews + col("average_rating")) / 3)

#     # count the occurrences of each category
#     category_counts = relevant_category_df.groupBy('category').count()

#     # sort the categories by count in descending order
#     sorted_categories = category_counts.sort(desc('count'))

#     # filter the categories by the score threshold
#     filtered_categories = sorted_categories.sort(desc('score'))

#     return filtered_categories


In [15]:
# def getRecommendation(id, group, categories, top_n):
#     if id is None:
#         print("Invalid input: id cannot be null")
#         return
    
#     # get context, similars, and categories dataframes
#     context_df, similars_context_df, categories_context_df = getContext(id)
#     out_categories_context_df = getSimilars(id)

#     # get relevant categories
#     relevant_categories_df = getRelevantCategory(group)

#     # filter categories by input categories
#     if categories is not None:
#         categories = [x.lower() for x in categories]
#         relevant_categories_df = relevant_categories_df.filter(lower(col('category')).isin(categories))

#     # get top n recommendations
#     recommendations = context_df.union(similars_context_df).join(
#         relevant_categories_df, on='asin', how='inner'
#     ).select('asin', 'title', 'score').distinct().sort(desc('score')).limit(top_n)

#     # show top n recommendations
#     recommendations.show(truncate=False)


3. Make a getRelevantCategory(group, top_n, score) function as:
if input group== Null, consider relevantcategory = categories_context_df obtained from getContext(id)
if input group is not Null, relevantcategory = filter by matching the input group with the groups in the out_categories_context_df obtained from getSimilars(id) 


In [36]:
cat_df= metadf.select('ID', 'asins', 'titles','groups','avg_rating','salesranks','total_reviews', F.explode('categories_list').alias('category')).distinct()
cat_df = cat_df.withColumnRenamed("category", "cat_category")

In [37]:
def getRelevantCategory(id, group):
    if group is None:
        categories_context_df = metadf.filter(col('ID') == id).select('ID', 'asins', 'titles', 'salesranks', 'total_reviews', 'avg_rating', 'groups', explode('categories_list').alias('category'))
        relevant_category_df = categories_context_df
    else:
        similars_context_df = metadf.filter(col('ID') == id).select('ID', 'asins', 'titles', 'salesranks', 'total_reviews', 'avg_rating', 'groups', 'categories_list', explode('similars_list').alias('similar_asins'))
        matches_df = similars_context_df.filter(col('similar_asins').isin(metadf.select('asins').rdd.flatMap(lambda x: x).collect()))
        out_categories_context_df = matches_df.select('ID', 'asins', 'titles', 'groups', 'total_reviews', 'avg_rating', 'salesranks', explode('categories_list').alias('category'))
        out_categories_context_df = out_categories_context_df.filter((col('groups') == group))
        relevant_category_df = out_categories_context_df.distinct()
    return relevant_category_df

# getRelevantCategory(11, "Book")

In [38]:
def getRecommendation(id, group, categories, top_n):
    if id is None:
        print("Invalid input: id cannot be null")
        return

    # get relevant categories
    sorted_categories = getRelevantCategory(id,group)
    # sorted_categories.show()
    # sorted_categories.printSchema()

    # filter the categories in sorted_categories that match with those in cat_df

    filtered_categories = sorted_categories.filter(col('category').isin(cat_df.select('cat_category').rdd.flatMap(lambda x: x).collect()))
    filtered_categories = filtered_categories.filter((col('groups') == group))

    # fetch the input category details from cat_df and append them to the filtered_categories dataframe
    for category in categories:
        input_category_details = cat_df.filter(col("cat_category") == category)
        filtered_categories = filtered_categories.union(input_category_details)

    recommendations = filtered_categories.withColumn("score", \
                ((10000000-col("salesranks")) / 10000000 + col("total_reviews") / 100 + col("avg_rating")) / 3)
    recommendations= recommendations.sort(desc('score')).limit(top_n)
    # recommendations.show(truncate=False)
    # recommendations.printSchema()
    return recommendations



    # # show top n recommendations
    # recommendations.show(10, truncate=False)
    # relevant_categories_df.show(truncate= False)

In [23]:
# def getRecommendation(id, group, categories, top_n):
#     if id is None:
#         print("Invalid input: id cannot be null")
#         return

#     # get relevant categories
#     sorted_categories = getRelevantCategory(id,group)

#     # # Add input category to sorted_categories
#     # new_row = spark.createDataFrame([(None, None, None, None, None, None, None, categories)], ["ID", "asins", "titles", "groups", "total_reviews", "avg_rating", "salesranks", "category"])
#     # sorted_categories = sorted_categories.union(new_row)
    
#     schema = StructType([
#         StructField("ID", StringType(), True),
#         StructField("asins", StringType(), True),
#         StructField("titles", StringType(), True),
#         StructField("groups", StringType(), True),
#         StructField("total_reviews", IntegerType(), True),
#         StructField("avg_rating", DoubleType(), True),
#         StructField("salesranks", StringType(), True),
#         StructField("category", StringType(), True)
#     ])
#     new_row = spark.createDataFrame([(None, None, None, None, None, None, None, categories)], schema)
#     sorted_categories = sorted_categories.union(new_row)

#     # filter the categories in sorted_categories that match with those in cat_df
#     filtered_categories = sorted_categories.filter(col('category').isin(cat_df.select('cat_category').rdd.flatMap(lambda x: x).collect()))

#     # calculate the score using min-max normalization
#     recommendations = filtered_categories.withColumn("score", \
#                 ((10000000-col("salesranks")) / 10000000 + col("total_reviews") / 100 + col("avg_rating")) / 3)
#     recommendations= recommendations.sort(desc('score')).limit(top_n)

#     return recommendations


In [24]:
getRecommendation(id='26', group="Book", categories= "   |Books[283155]|Subjects[1000]|Children's Books[4]|Ages 4-8[2785]|General[170062]", top_n=5)


+---+-----+------+----------+-------------+----------+------+---------------+-------------+
|ID |asins|titles|salesranks|total_reviews|avg_rating|groups|categories_list|similar_asins|
+---+-----+------+----------+-------------+----------+------+---------------+-------------+
+---+-----+------+----------+-------------+----------+------+---------------+-------------+

root
 |-- ID: integer (nullable = true)
 |-- asins: string (nullable = true)
 |-- titles: string (nullable = true)
 |-- salesranks: double (nullable = true)
 |-- total_reviews: integer (nullable = true)
 |-- avg_rating: float (nullable = true)
 |-- groups: string (nullable = true)
 |-- categories_list: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- similar_asins: string (nullable = true)



DataFrame[ID: string, asins: string, titles: string, groups: string, total_reviews: int, avg_rating: double, salesranks: string, category: string, score: double]

In [15]:
getRecommendation(id='26', group="Book", categories= "   |Books[283155]|Subjects[1000]|Children's Books[4]|Ages 4-8[2785]|General[170062]", top_n=5)


+---+-----+------+------+-------------+----------+----------+--------+-----+
|ID |asins|titles|groups|total_reviews|avg_rating|salesranks|category|score|
+---+-----+------+------+-------------+----------+----------+--------+-----+
+---+-----+------+------+-------------+----------+----------+--------+-----+

root
 |-- ID: integer (nullable = true)
 |-- asins: string (nullable = true)
 |-- titles: string (nullable = true)
 |-- groups: string (nullable = true)
 |-- total_reviews: integer (nullable = true)
 |-- avg_rating: float (nullable = true)
 |-- salesranks: double (nullable = true)
 |-- category: string (nullable = false)
 |-- score: double (nullable = true)



DataFrame[ID: int, asins: string, titles: string, groups: string, total_reviews: int, avg_rating: float, salesranks: double, category: string, score: double]

In [72]:
getRecommendation(id='11', group="Music", categories= '|Books[283155]|Subjects[1000]|Health, Mind & Body[10]|Alternative Medicine[4696]|General[4701]', top_n=5)
getRecommendation(id='19', group="DVD", categories= '   |[139452]|DVD[130]|Genres[404276]|Science Fiction & Fantasy[163431]|Fantasy[163440]', top_n=5)

+----------+--------------------------------------+----------+-------------+----------+------------------+
|asins     |titles                                |salesranks|total_reviews|avg_rating|score             |
+----------+--------------------------------------+----------+-------------+----------+------------------+
|0385504209|The Da Vinci Code                     |19.0      |3049         |3.5       |11.663332699999998|
|0792151712|Titanic                               |5735.0    |1817         |3.5       |7.5564755         |
|1590073991|Path of Daggers (The Wheel of Time, 8)|569310.0  |1688         |3.0       |6.941023          |
|1565113306|Fight Club                            |359382.0  |551          |4.5       |3.6580206         |
|B0000296JB|Make Yourself                         |2439.0    |545          |4.5       |3.6499187         |
+----------+--------------------------------------+----------+-------------+----------+------------------+

+-----+------+----------+-----------

In [23]:
#import necessary modules
from pyspark import *
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyspark.sql.functions as F
from Metadataload import metadataload
metadf= metadataload()
# metadf.show()
cat_df= metadf.select('ID', 'asins', 'titles','groups','avg_rating','salesranks','total_reviews', F.explode('categories_list').alias('category')).distinct()
cat_df = cat_df.withColumnRenamed("category", "cat_category")
def getRelevantCategory(id, group):
    if group is None:
        categories_context_df = metadf.filter(col('ID') == id).select('ID', 'asins', 'titles', 'salesranks', 'total_reviews', 'avg_rating', 'groups', explode('categories_list').alias('category'))
        relevant_category_df = categories_context_df
    else:
        similars_context_df = metadf.filter(col('ID') == id).select('ID', 'asins', 'titles', 'salesranks', 'total_reviews', 'avg_rating', 'groups', 'categories_list', explode('similars_list').alias('similar_asins'))
        matches_df = similars_context_df.filter(col('similar_asins').isin(metadf.select('asins').rdd.flatMap(lambda x: x).collect()))
        out_categories_context_df = matches_df.select('ID', 'asins', 'titles', 'groups', 'total_reviews', 'avg_rating', 'salesranks', explode('categories_list').alias('category'))
        out_categories_context_df = out_categories_context_df.filter((col('groups') == group))
        relevant_category_df = out_categories_context_df.distinct()
    return relevant_category_df

def getRecommendation(id, group, categories, top_n):
    if id is None:
        print("Invalid input: id cannot be null")
        return

    # get relevant categories
    sorted_categories = getRelevantCategory(id,group)
    # filter the categories in sorted_categories that match with those in cat_df
    filtered_categories = sorted_categories.filter(col('category').isin(cat_df.select('cat_category').rdd.flatMap(lambda x: x).collect()))
    # recommendations= filtered_categories.select('asins', 'titles', 'salesranks', 'total_reviews', 'avg_rating')
    # calculate the score using min-max normalization
    recommendations = filtered_categories.withColumn("score", \
                ((10000000-col("salesranks")) / 10000000 + col("total_reviews") / 100 + col("avg_rating")) / 3)
    recommendations= recommendations.sort(desc('score')).limit(top_n)
    # recommendations.show(truncate=False)
    # recommendations.printSchema()
    recommendations= recommendations.select("asins", "title")
    recommendations.show(truncate=False)
    return recommendations


In [39]:
getRecommendation(id='19', group="DVD", categories= '   |[139452]|DVD[130]|Genres[404276]|Science Fiction & Fantasy[163431]|Fantasy[163440]', top_n=5)


DataFrame[ID: int, asins: string, titles: string, groups: string, total_reviews: int, avg_rating: float, salesranks: double, category: string, score: double]

In [42]:
recommendations= getRecommendation(id='19', group="DVD", categories= '   |[139452]|DVD[130]|Genres[404276]|Science Fiction & Fantasy[163431]|Fantasy[163440]', top_n=5)

In [43]:
recommendations.show()

Py4JJavaError: An error occurred while calling o40580.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 33.0 failed 1 times, most recent failure: Lost task 1.0 in stage 33.0 (TID 201) (AP-v6p9hhLDJqoL.bfl.com executor driver): java.net.SocketException: Software caused connection abort: recv failed
	at java.net.SocketInputStream.socketRead0(Native Method)
	at java.net.SocketInputStream.socketRead(Unknown Source)
	at java.net.SocketInputStream.read(Unknown Source)
	at java.net.SocketInputStream.read(Unknown Source)
	at java.io.BufferedInputStream.fill(Unknown Source)
	at java.io.BufferedInputStream.read(Unknown Source)
	at java.io.DataInputStream.readInt(Unknown Source)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:76)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:68)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:53)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:438)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:272)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.net.SocketException: Software caused connection abort: recv failed
	at java.net.SocketInputStream.socketRead0(Native Method)
	at java.net.SocketInputStream.socketRead(Unknown Source)
	at java.net.SocketInputStream.read(Unknown Source)
	at java.net.SocketInputStream.read(Unknown Source)
	at java.io.BufferedInputStream.fill(Unknown Source)
	at java.io.BufferedInputStream.read(Unknown Source)
	at java.io.DataInputStream.readInt(Unknown Source)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:76)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:68)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:53)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:438)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:272)
