In [2]:
from pyspark import SparkContext as sc
from pyspark.sql import SparkSession as ss

In [3]:
sc = sc()
ss = ss(sc)

In [4]:
sup1 = ss.read.csv('file:///etc/dataset/superhero.csv', inferSchema=True, header=True)

In [6]:
sup1.describe()

DataFrame[summary: string, _c0: string, name: string, Gender: string, Eye color: string, Race: string, Hair color: string, Height: string, Publisher: string, Skin color: string, Alignment: string, Weight: string]

In [21]:
new_df = sup1.withColumnRenamed('Eye color', 'eye_color').withColumnRenamed('Hair color', 'hair_color').withColumnRenamed('Skin color', 'skin_color')

In [22]:
new_df.describe()

DataFrame[summary: string, _c0: string, name: string, Gender: string, eye_color: string, Race: string, hair_color: string, Height: string, Publisher: string, skin_color: string, Alignment: string, Weight: string]

In [23]:
new_df.write.parquet('/etc/dataset/superhero.parquet')

In [24]:
from_parquet = ss.read.parquet('file:///etc/dataset/superhero.parquet')

In [25]:
from_parquet.describe()

DataFrame[summary: string, _c0: string, name: string, Gender: string, eye_color: string, Race: string, hair_color: string, Height: string, Publisher: string, skin_color: string, Alignment: string, Weight: string]

In [28]:
from_parquet.columns

['_c0',
 'name',
 'Gender',
 'eye_color',
 'Race',
 'hair_color',
 'Height',
 'Publisher',
 'skin_color',
 'Alignment',
 'Weight']

In [29]:
for column in from_parquet.columns:
    from_parquet = from_parquet.withColumnRenamed(column, column.lower().replace(' ', '_'))

In [30]:
from_parquet.describe()

DataFrame[summary: string, _c0: string, name: string, gender: string, eye_color: string, race: string, hair_color: string, height: string, publisher: string, skin_color: string, alignment: string, weight: string]

In [36]:
from pyspark.ml.feature import StringIndexer

In [31]:
from pyspark.ml.feature import OneHotEncoder

In [37]:
to_parse = ['gender', 'eye_color', 'race', 'hair_color', 'publisher', 'skin_color', 'alignment']

In [44]:
from_parquet = from_parquet.dropna()

In [45]:
indexer = StringIndexer(inputCols=to_parse, outputCols=list(map(lambda x: x+'index', to_parse)))

In [46]:
indexed = indexer.fit(from_parquet).transform(from_parquet)

In [47]:
indexed.describe()

DataFrame[summary: string, _c0: string, name: string, gender: string, eye_color: string, race: string, hair_color: string, height: string, publisher: string, skin_color: string, alignment: string, weight: string, genderindex: string, hair_colorindex: string, skin_colorindex: string, raceindex: string, eye_colorindex: string, publisherindex: string, alignmentindex: string]

In [52]:
indexed.count()

719

In [54]:
encoder = OneHotEncoder().setInputCols(list(map(lambda x: x+'index', to_onehot))).setOutputCols(list(map(lambda x: x+'vec', to_parse)))

In [55]:
model = encoder.fit(indexed)

In [56]:
done = model.transform(indexed)

In [58]:
done.describe()

DataFrame[summary: string, _c0: string, name: string, gender: string, eye_color: string, race: string, hair_color: string, height: string, publisher: string, skin_color: string, alignment: string, weight: string, genderindex: string, hair_colorindex: string, skin_colorindex: string, raceindex: string, eye_colorindex: string, publisherindex: string, alignmentindex: string]

In [59]:
done.show()

+---+-----------------+------+---------+-----------------+----------+------+-----------------+----------+---------+------+-----------+---------------+---------------+---------+--------------+--------------+--------------+--------------+---------------+--------------+-------------+---------------+-------------+---------------+
|_c0|             name|gender|eye_color|             race|hair_color|height|        publisher|skin_color|alignment|weight|genderindex|hair_colorindex|skin_colorindex|raceindex|eye_colorindex|publisherindex|alignmentindex|  eye_colorvec|   publishervec| hair_colorvec|    gendervec|        racevec| alignmentvec|  skin_colorvec|
+---+-----------------+------+---------+-----------------+----------+------+-----------------+----------+---------+------+-----------+---------------+---------------+---------+--------------+--------------+--------------+--------------+---------------+--------------+-------------+---------------+-------------+---------------+
|  0|           

In [60]:
done.count()

719

In [63]:
done.describe()

DataFrame[summary: string, _c0: string, name: string, gender: string, eye_color: string, race: string, hair_color: string, height: string, publisher: string, skin_color: string, alignment: string, weight: string, genderindex: string, hair_colorindex: string, skin_colorindex: string, raceindex: string, eye_colorindex: string, publisherindex: string, alignmentindex: string]

In [73]:
idx = ss.createDataFrame(zip(range(719), map(lambda x: chr(x), range(97, 97+719))), schema=['idx', 'alpha'])

In [74]:
idx.show()

+---+-----+
|idx|alpha|
+---+-----+
|  0|    a|
|  1|    b|
|  2|    c|
|  3|    d|
|  4|    e|
|  5|    f|
|  6|    g|
|  7|    h|
|  8|    i|
|  9|    j|
| 10|    k|
| 11|    l|
| 12|    m|
| 13|    n|
| 14|    o|
| 15|    p|
| 16|    q|
| 17|    r|
| 18|    s|
| 19|    t|
+---+-----+
only showing top 20 rows



In [81]:
idx.join(done, on=idx.idx == done._c0).drop('_c0').show()

+---+-----+-----------------+------+---------+-----------------+----------+------+-----------------+----------+---------+------+-----------+---------------+---------------+---------+--------------+--------------+--------------+--------------+---------------+--------------+-------------+---------------+-------------+---------------+
|idx|alpha|             name|gender|eye_color|             race|hair_color|height|        publisher|skin_color|alignment|weight|genderindex|hair_colorindex|skin_colorindex|raceindex|eye_colorindex|publisherindex|alignmentindex|  eye_colorvec|   publishervec| hair_colorvec|    gendervec|        racevec| alignmentvec|  skin_colorvec|
+---+-----+-----------------+------+---------+-----------------+----------+------+-----------------+----------+---------+------+-----------+---------------+---------------+---------+--------------+--------------+--------------+--------------+---------------+--------------+-------------+---------------+-------------+---------------

In [83]:
from pyspark.sql.functions import Column

In [85]:
from pyspark.sql import functions as F

In [86]:
from pyspark.sql import types as T

In [87]:
def isMarvel(val):
    if 'marvel' in val.lower():
        return 0
    else:
        return 1
    
udfMarvel = F.udf(isMarvel, T.IntegerType())

marvel_emp = done.withColumn('is_marvel', udfMarvel('publisher')).drop('publisher')

In [88]:
marvel_emp.show()

+---+-----------------+------+---------+-----------------+----------+------+----------+---------+------+-----------+---------------+---------------+---------+--------------+--------------+--------------+--------------+---------------+--------------+-------------+---------------+-------------+---------------+---------+
|_c0|             name|gender|eye_color|             race|hair_color|height|skin_color|alignment|weight|genderindex|hair_colorindex|skin_colorindex|raceindex|eye_colorindex|publisherindex|alignmentindex|  eye_colorvec|   publishervec| hair_colorvec|    gendervec|        racevec| alignmentvec|  skin_colorvec|is_marvel|
+---+-----------------+------+---------+-----------------+----------+------+----------+---------+------+-----------+---------------+---------------+---------+--------------+--------------+--------------+--------------+---------------+--------------+-------------+---------------+-------------+---------------+---------+
|  0|           A-Bomb|  Male|   yellow|