In [1]:
import os
import re
import shutil
import argparse
import numpy as np
import pandas as pd
from heapq import nlargest
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import HiveContext
from collections import defaultdict
from pyspark.sql.functions import udf
from pyspark.context import SparkContext
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import CountVectorizer, IDF, Tokenizer
from pyspark.mllib.linalg import Vectors, SparseVector, DenseVector

In [2]:
# version 1
# def read_line(line):
#     if line is None:
#         return None
#     line = line.split('\t')[-1]
#     line = re.sub('[^A-Za-z_]', '', line)
#     line = re.sub(' +', ' ', line.replace('_', ' '))
#     return line.strip()

In [3]:
# version 1
# def read_data(sc, data_path):
#     if sc is None or data_path is None:
#         return None
#     documents = sc.textFile(data_path).map(lambda line : read_line(line))
#     schema = StructType([StructField("activity", StringType(), True)])
#     use this if the row has only one column
#     documents = documents.map (lambda x: Row(x))
#     documents = SQLContext(sc).createDataFrame(documents, schema)
    
#     tokenizer = Tokenizer(inputCol="activity", outputCol="tokens")
#     documents = tokenizer.transform(documents)

#     remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")
#     documents = remover.transform(documents)
#     return documents

In [4]:
def read_line(row):
    if row is None:
        return None
    row = row.split(',')[0]
    row_formatted = re.sub('[^A-Za-z_]', '', row)
    row_formatted = re.sub(' +', ' ', row_formatted.replace('_', ' '))
    return row, row_formatted.strip()

In [5]:
def read_data(sc, data_path):
    if sc is None or data_path is None:
        return None
    documents = sc.textFile(data_path).map(lambda line : read_line(line))
    schema = StructType([StructField("raw_activity", StringType(), True),
                         StructField("activity", StringType(), True)])
    documents = SQLContext(sc).createDataFrame(documents, schema)  
    tokenizer = Tokenizer(inputCol="activity", outputCol="tokens")
    documents = tokenizer.transform(documents)

    remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")
    documents = remover.transform(documents)
    return documents

In [6]:
def tf_idf_transform(documents):
    if documents is None:
        return None
    countVectorizer = CountVectorizer(inputCol="filtered_tokens", outputCol="feature_counts")
    countVectorizerModel = countVectorizer.fit(documents)
    documents = countVectorizerModel.transform(documents)
    idf = IDF(inputCol="feature_counts", outputCol="features")
    idfModel = idf.fit(documents)
    documents = idfModel.transform(documents)
    return countVectorizerModel.vocabulary, documents
    

In [7]:
def clustering(documents, output_path, k=2, max_iter=20):
    if documents is None or output_path is None:
        return None
    output_path = os.getcwd() + "/" + output_path + "/" + str(k)
    if os.path.exists(output_path):
        shutil.rmtree(output_path) 
    os.makedirs(output_path)
    out_file = open(os.path.join(output_path, "cost.txt"), "w")
    kmeans = KMeans(featuresCol="features").setK(k).setMaxIter(max_iter)
    km_model = kmeans.fit(documents)
    clustersTable = km_model.transform(documents)
    clusterCenters = km_model.clusterCenters()
    wssse = km_model.computeCost(documents)
    out_file.write("Cluster {}".format(k))
    out_file.write("\n")
    out_file.write("cost:" + str(wssse))
    out_file.write("\n")
    return clustersTable, clusterCenters

In [8]:
def extract(row):
    return (row.prediction, ) + tuple(row.features.toArray().tolist())

In [9]:
def sparse_add(v1, v2):
    #assert isinstance(v1, SparseVector) and isinstance(v2, SparseVector)
    assert v1.size == v2.size
    values = defaultdict(float) # Dictionary with default value 0.0
    # Add values from v1
    for i in range(v1.indices.size):
        values[v1.indices[i]] += v1.values[i]
    # Add values from v2
    for i in range(v2.indices.size):
        values[v2.indices[i]] += v2.values[i]
    return Vectors.sparse(v1.size, dict(values))

In [10]:
def sparse_divide(v):
    #assert isinstance(v, SparseVector) 
    values = defaultdict(float) # Dictionary with default value 0.0
     # Add values from v[0]
    for i in range(v[0].indices.size):
        values[v[0].indices[i]] = v[0].values[i] / float(v[1])
    return Vectors.sparse(v[0].size, dict(values))

In [11]:
def get_top_index(v, k):
    #assert isinstance(v, SparseVector)
    dct = {}
    for i in range(v.indices.size):
        dct[v.indices[i]] = v.values[i]
    return nlargest(k, dct, key=dct.get)
    

In [12]:
# Very Basic implementation of top k words
# def get_top_keywords(clustersTable, vocab, output_path, n_terms=10, k=2):
#     if clustersTable is None or vocab is None or output_path is None:
#         return None
#     if not os.path.exists(output_path):
#         os.makedirs(output_path)
#     out_path = os.getcwd() + "/" + output_path + "/" + str(k)
#     out_file = open(os.path.join(out_path, "out.txt"), "w")
#     clusters = clustersTable.select("features","prediction")
#     clusters = clusters.rdd.map(extract).toDF(["prediction"]) 
#     clusters = clusters.groupby("prediction").mean()
#     clusters = clusters.drop("prediction")
#     clusters = clusters.drop("avg(prediction)")
#     clusters_array = np.array(clusters.collect())
#     for idx, row in enumerate(clusters_array):
#         out_file.write("cluster {}".format(idx))
#         out_file.write("\n")
#         out_file.write(",".join([vocab[t] for t in np.argsort(row)[-n_terms:]]))
#         out_file.write("\n")
#     out_file.close()
#     return None         

In [13]:
# Basic implementation of top k words
def get_top_keywords(clustersTable, vocab, output_path, n_terms=10, k=2):
    if clustersTable is None or vocab is None or output_path is None:
        return None
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    out_path = os.getcwd() + "/" + output_path + "/" + str(k)
    out_file = open(os.path.join(out_path, "out.txt"), "w")
    clusters = clustersTable.select("prediction", "features")
    clusters_array = clusters.rdd \
        .mapValues(lambda v: (v.toArray(), 1)) \
        .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) \
        .mapValues(lambda v: v[0]/v[1]).collect()
    for row in clusters_array:
        out_file.write("cluster {}".format(row[0]))
        out_file.write("\n")
        out_file.write(",".join([vocab[t] for t in np.argsort(row[1])[-n_terms:]]))
        out_file.write("\n")
    out_file.close()
    return clusters_array

In [14]:
# Advanced implementation of top k words
def get_top_keywords_test(clustersTable, vocab, output_path, n_terms=10, k=2):
    if clustersTable is None or vocab is None or output_path is None:
        return None
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    out_path = os.getcwd() + "/" + output_path + "/" + str(k)
    out_file = open(os.path.join(out_path, "out.txt1"), "w")
    clusters = clustersTable.select("prediction", "features")
    clusters_array = clusters.rdd \
        .mapValues(lambda v: (v, 1)) \
        .reduceByKey(lambda a,b: (sparse_add(a[0], b[0]), a[1]+b[1])) \
        .mapValues(lambda v: sparse_divide(v)) \
        .mapValues(lambda v: get_top_index(v, n_terms)).collect()
    for row in clusters_array:
        out_file.write("cluster {}".format(row[0]))
        out_file.write("\n")
        out_file.write(",".join([vocab[t] for t in row[1]]))
        out_file.write("\n")
    out_file.close()
    return clusters_array

In [15]:
def run_clustering(sc, input_path, output_path, num_clusters_list, max_iter=20):
    if sc is None or input_path is None or num_clusters_list is None or output_path is None:
        return None
    documents = read_data(sc, input_path)
    vocab, documents = tf_idf_transform(documents)
    for k in num_clusters_list:
        clustersTable, clusterCenters = clustering(documents, output_path, k, max_iter)
        write_results(sc, clustersTable, output_path, k)
        #get_top_keywords(clustersTable, vocab, output_path, max_iter, k)
        get_top_keywords_test(clustersTable, vocab, output_path, max_iter, k)
    return clustersTable, clusterCenters, documents

In [16]:
def write_results(sc, clustersTable, output_path, k):
    if sc is None or clustersTable is None or output_path is None:
        return None
    output_path = output_path + '/' + str(k) + '/results'
    if os.path.exists(output_path):
        shutil.rmtree(output_path) 
    resultsdf = clustersTable.select('activity', 'prediction')
    resultsdf.write.format('csv').option('delimiter', '\t').option('header', 'true').save(output_path)
    return None

In [17]:
dist = udf(lambda features, prediction: np.subtract(features.toArray().tolist(), clusterCenters[prediction]), ArrayType(IntegerType()))
#sqrt_dist = udf(lambda features, prediction: features - clusterCenters[prediction])




In [18]:
#sc = SparkContext(conf=SparkConf().setAppName("tf-idf_clustering_on_spark"))
clustersTable, clusterCenters, documents = run_clustering(sc, 'test_data_bck', 'output', [3])
# clustersTable = clustersTable.withColumn('dist', dist(clustersTable.features, clustersTable.prediction))
# clustersTable.show(1)
# print(clusterCenters[0])


In [19]:
clusters = clustersTable.select("prediction", "features")

In [20]:
clusters.show()

+----------+--------------------+
|prediction|            features|
+----------+--------------------+
|         0|(349,[0,12,52,84,...|
|         0|(349,[0,1,2,7,8,9...|
|         0|(349,[0,1,2,8,66,...|
|         0|(349,[0,1,38,39,3...|
|         0|(349,[0,1,2,7,8,9...|
|         0|(349,[0,1,11,117]...|
|         0|(349,[0,1,38,39,2...|
|         0|(349,[0,1,2,8,66,...|
|         0|(349,[0,1,2,7,8,9...|
|         0|(349,[0,1,2,7,8,9...|
|         0|(349,[0,1,38,39,1...|
|         0|(349,[0,1,2,7,8,9...|
|         0|(349,[0,1,2,7,8,9...|
|         0|(349,[0,1,2,7,8,9...|
|         0|(349,[0,1,38,39,2...|
|         0|(349,[0,1,11,117]...|
|         0|(349,[0,1,2,7,8,9...|
|         0|(349,[0,1,2,7,8,9...|
|         0|(349,[0,1,38,39,3...|
|         0|(349,[0,1,2,7,8,9...|
+----------+--------------------+
only showing top 20 rows



In [65]:
documents.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|        raw_activity|            activity|              tokens|     filtered_tokens|      feature_counts|            features|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|https://www.nytim...|httpswwwnytimesco...|[httpswwwnytimesc...|[httpswwwnytimesc...|(349,[0,12,52,84,...|(349,[0,12,52,84,...|
|https://www.walma...|httpswwwwalmartco...|[httpswwwwalmartc...|[httpswwwwalmartc...|(349,[0,1,2,7,8,9...|(349,[0,1,2,7,8,9...|
|https://www.walma...|httpswwwwalmartco...|[httpswwwwalmartc...|[httpswwwwalmartc...|(349,[0,1,2,8,66,...|(349,[0,1,2,8,66,...|
|https://www.walma...|httpswwwwalmartco...|[httpswwwwalmartc...|[httpswwwwalmartc...|(349,[0,1,38,39,3...|(349,[0,1,38,39,3...|
|https://www.walma...|httpswwwwalmartco...|[httpswwwwalmartc...|[httpswwwwalmartc...|(349,[0,1,2,7,8,9..

In [125]:
#features = np.array(documents.select('features').rdd.flatMap(lambda x: x).collect())
first = documents.select("features").head(1)
print(first)
features = np.array(documents.select("features").rdd.map(lambda row: row.features.toArray()).collect())
#features = documents.select("features").rdd.map(lambda row: (row.features.indices, row.features.values)).collect()
features


[Row(features=SparseVector(349, {0: 0.04, 12: 4.6251, 52: 7.033, 84: 7.033, 116: 7.8439, 138: 7.8439, 252: 3.922, 322: 3.922, 339: 3.922}))]


array([[0.04000133, 0.        , 0.        , ..., 0.        , 0.        ,
        0.        ],
       [0.04000133, 2.42784627, 3.14119616, ..., 0.        , 0.        ,
        0.        ],
       [0.04000133, 2.42784627, 3.14119616, ..., 0.        , 0.        ,
        0.        ],
       ...,
       [0.02000067, 0.        , 0.        , ..., 0.        , 0.        ,
        0.        ],
       [0.04000133, 0.        , 0.        , ..., 0.        , 0.        ,
        0.        ],
       [0.04000133, 0.        , 0.        , ..., 0.        , 0.        ,
        0.        ]])

In [130]:
from pyspark.mllib.linalg.distributed import RowMatrix
mat = RowMatrix(documents.select("features").rdd.map(list))
print(mat.numRows())
print(mat.numCols())
mat.rows.collect()

100
349


[DenseVector([0.04, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 4.6251, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 7.033, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 7.033, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 7.8439, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 7.8439, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.

In [42]:
from pyspark.sql.types import *
vector_udf = udf(lambda vector: vector.toArray().tolist(),ArrayType(DoubleType()))

In [43]:
colvalues = documents.select(vector_udf('features').alias('features')).collect()

In [46]:
features= np.array(list(map(lambda x:x.features,colvalues)))


array([[0.04000133, 0.        , 0.        , ..., 0.        , 0.        ,
        0.        ],
       [0.04000133, 2.42784627, 3.14119616, ..., 0.        , 0.        ,
        0.        ],
       [0.04000133, 2.42784627, 3.14119616, ..., 0.        , 0.        ,
        0.        ],
       ...,
       [0.02000067, 0.        , 0.        , ..., 0.        , 0.        ,
        0.        ],
       [0.04000133, 0.        , 0.        , ..., 0.        , 0.        ,
        0.        ],
       [0.04000133, 0.        , 0.        , ..., 0.        , 0.        ,
        0.        ]])

In [87]:
features.shape

(100, 349)

In [41]:
features.shape

()

In [88]:
tops = features.max(axis=0)
bots = features.min(axis=0)

In [89]:
tops

array([ 0.04000133,  2.42784627,  3.14119616, 20.2854318 ,  3.81414063,
        4.10034232, 18.03409563,  4.10034232,  1.39624469,  4.10034232,
        4.26042773,  1.61938824,  4.62507085, 12.02273042,  4.83579188,
        4.83579188,  4.83579188,  5.33842074, 12.02273042,  5.33842074,
        5.33842074,  5.6467221 ,  5.6467221 ,  5.6467221 ,  5.6467221 ,
        5.6467221 ,  6.01136521,  6.01136521, 14.06603291,  6.01136521,
        6.01136521,  6.01136521,  6.01136521,  6.01136521,  6.01136521,
        6.01136521,  6.01136521, 14.06603291,  2.53567898,  2.53567898,
        2.66921037,  5.6467221 ,  2.66921037,  2.66921037,  2.66921037,
        6.45765231,  6.45765231,  2.66921037,  6.45765231,  5.6467221 ,
        6.01136521,  2.82336105, 10.54952468,  2.82336105,  2.82336105,
        2.82336105,  2.82336105,  2.82336105,  7.03301646,  7.03301646,
        3.0056826 ,  3.0056826 ,  7.03301646,  7.03301646,  7.03301646,
        7.03301646,  7.03301646,  7.03301646,  7.03301646,  7.03

In [30]:
tops.size

349

In [90]:
bots

array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0.

In [67]:
from pyspark.ml.linalg import SparseVector
from operator import attrgetter

df = sc.parallelize([
    (SparseVector(3, [0, 2], [1.0, 3.0]), ),
    (SparseVector(3, [1], [4.0]), )
]).toDF(["features"])

In [58]:
features = df.rdd.map(attrgetter("features"))

In [57]:
features.collect()

[SparseVector(3, {0: 1.0, 2: 3.0}), SparseVector(3, {1: 4.0})]

In [59]:
indexed_features = features.zipWithIndex()

In [66]:
indexed_features.collect()

[(SparseVector(3, {0: 1.0, 2: 3.0}), 0), (SparseVector(3, {1: 4.0}), 1)]

In [61]:
def explode(row):
    vec, i = row
    for j, v in zip(vec.indices, vec.values):
        yield i, j, v

entries = indexed_features.flatMap(explode)

In [62]:
entries.collect()

[(0, 0, 1.0), (0, 2, 3.0), (1, 1, 4.0)]

In [69]:
from pyspark.ml.linalg import SparseVector

df = sc.parallelize([
    (SparseVector(3, [0, 2], [1.0, 3.0]), ),
    (SparseVector(3, [1], [4.0]), )
]).toDF(["features"])

In [70]:
def explode(row):
    for i, v in zip(row.indices, row.values):
        yield i, v

In [73]:
df.rdd.flatMap(lambda v : explode(v)).collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 47.0 failed 1 times, most recent failure: Lost task 7.0 in stage 47.0 (TID 136, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/shabha/Documents/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/types.py", line 1556, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'indices' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/shabha/Documents/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/Users/shabha/Documents/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/shabha/Documents/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-70-28c884d02894>", line 2, in explode
  File "/Users/shabha/Documents/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/types.py", line 1561, in __getattr__
    raise AttributeError(item)
AttributeError: indices

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:162)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/shabha/Documents/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/types.py", line 1556, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'indices' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/shabha/Documents/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/Users/shabha/Documents/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/shabha/Documents/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-70-28c884d02894>", line 2, in explode
  File "/Users/shabha/Documents/spark-2.3.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/types.py", line 1561, in __getattr__
    raise AttributeError(item)
AttributeError: indices

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
