In [1]:
import os
import sys

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

spark = SparkSession.builder.appName("marvel1").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

## Names

In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

names_schema = StructType([StructField("id", IntegerType(), False),
                           StructField("name", StringType(), False)
                          ]);
df_names = spark.read.schema(names_schema).option("delimiter", " ").csv("marvel_names.txt")
df_names.createOrReplaceTempView("names")
df_names

id,name
1,24-HOUR MAN/EMMANUEL
2,3-D MAN/CHARLES CHAN
3,4-D MAN/MERCURIO
4,8-BALL/
5,A
6,A'YIN
7,"ABBOTT, JACK"
8,ABCISSA
9,ABEL
10,ABOMINATION/EMIL BLO


## Graph

In [11]:
df = spark.read.text("marvel_graph.txt")
df.createOrReplaceTempView("vector")
df

value
5988 748 1722 375...
5989 4080 4264 44...
5982 217 595 1194...
5983 1165 3836 43...
5980 2731 3712 15...
5981 3569 5353 40...
5986 2658 3712 26...
5987 2614 5716 17...
5984 590 4898 745...
5985 3233 2254 21...


In [39]:
spark.sql("""

with line_tab as (
  select value as line
    from vector
),

array_tab as (
  select split(line, " ") as array
    from line_tab
),

heros_rank as (
  select int(element_at(array, 1)) as hero_id, 
         sum(array_size(array) - 1) as rank
    from array_tab
   group by hero_id
   order by rank, hero_id
)

select *    
  from heros_rank

""")

hero_id,rank
467,1
577,1
835,1
1089,1
1408,1
1841,1
2117,1
2139,1
2911,1
3014,1


In [48]:
spark.sql("""

with line_tab as (
  select value as line
    from vector
),

array_tab as (
  select split(line, " ") as array
    from line_tab
),

heros_rank as (
  select int(element_at(array, 1)) as hero_id, 
         sum(array_size(array) - 1) as cnt
    from array_tab
   group by hero_id
),

minimum (
  select min(cnt) as value
    from heros_rank
)

 select id, name, cnt
   from heros_rank
   join minimum
   join names
     on hero_id = id
  where cnt = minimum.value
  order by hero_id

""")

id,name,cnt
467,BERSERKER II,1
577,BLARE/,1
835,"CALLAHAN, DANNY",1
1089,CLUMSY FOULUP,1
1408,DEATHCHARGE,1
1841,FENRIS,1
2117,"GERVASE, LADY ALYSSA",1
2139,"GIURESCU, RADU",1
2911,KULL,1
3014,"JOHNSON, LYNDON BAIN",1
