In [1]:
#create spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark import SparkConf, SparkContext

#create config 

config = SparkConf().setAppName("MySparkApp")
context = SparkContext(conf=config) 

# Create SparkSession 
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("Datenbanken mit Spark") \
      .getOrCreate() # getOrCreate liefert existierende Session wenn es schon eine gibt


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/12 09:30:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
#test if spark is running 
spark

In [3]:
    ## tsv to dataFrame , for csv remove sep t , header option puts first line into header##
df= spark.read.option("sep","\t").csv('bands.tsv',header=True, inferSchema=True)
df.show()

                                                                                

+----+----------+--------------------+--------------------+--------------------+-----+----+
|name|        id|              member|               group|                role|start| end|
+----+----------+--------------------+--------------------+--------------------+-----+----+
|null|/m/062vhlh|   Hank Williams III|            Antiseen|                null| null|null|
|null|/m/0654bxy|         John Cooper|             Skillet|Lead vocalist,Aco...| 1996|null|
|null|/m/05kgt58|     Christoph Kohli|                Span|  Bass guitar,Vocals| null|null|
|null|/m/05147hs|     Rodrigo Aravena|         Men at Work|Bass guitar,backi...| 2000|null|
|null|/m/05cm2fy|       Vincent Kenis|The Honeymoon Kil...|                null| null|null|
|null|/m/05nqgx6|      Derek Lee Rock|    Suburban Legends|                null| null|null|
|null|/m/05nqdz6|        Emil Johnson|          Black Flag|                null| null|null|
|null|/m/05nnk5v|          Jeff Plate|            Savatage|                null|

In [6]:
    ## merge df and df2 and clean

    ##prepare data##
df2= spark.read.option("sep","\t").csv('bands2.tsv',header=True, inferSchema=True)


    # union by name so it doesnt depend on position# 
#output= df.unionByName(df2, allowMissingColumns=True).dropDuplicates()

    #clean data of nulls
output = output.fillna(0)
output = output.na.fill("")
output = output.drop("name").withColumnRenamed("member","Name")
output.show()

+----+----+---+-----+----+-----+---+---+-----+----+-----+---+
|Name|Name| id|group|role|start|end| id|group|role|start|end|
+----+----+---+-----+----+-----+---+---+-----+----+-----+---+
+----+----+---+-----+----+-----+---+---+-----+----+-----+---+



In [None]:
    # join to merge data for dependencies
output= df.join(df2,how="inner")
output = df.join(df2, on=['member','member']) 

In [None]:
    ##write dataframe to file with tab  ##
output.write.option("header",True).option("delimiter","\t").csv("output/")

In [5]:
    #ADD COLUMN with data
output.withColumn("im Number 5", lit(5)).show()

+----------+--------------------+--------------------+--------------------+-----+----+-----------+
|        id|                Name|               group|                role|start| end|im Number 5|
+----------+--------------------+--------------------+--------------------+-----+----+-----------+
|/m/05nnk5v|          Jeff Plate|            Savatage|                    |     |    |          5|
|/m/04xwczh|   Sérgio Pencarinha|          Cinemuerte|            Drum kit| 2008|    |          5|
|/m/05nqdz6|        Emil Johnson|          Black Flag|                    |     |    |          5|
|/m/05kgt58|     Christoph Kohli|                Span|  Bass guitar,Vocals|     |    |          5|
|/m/05nn6cy|         Neil Citron|          Quiet Riot|                    |     |    |          5|
|/m/05nn7vv|Rowland Charles G...|            Level 42|                    |     |    |          5|
|/m/05nqgx6|      Derek Lee Rock|    Suburban Legends|                    |     |    |          5|
|/m/0654bx

In [None]:
    ## SPARK READ FILES AND USE SOME SQL Basics##
    #read local file
textFile = spark.read.text("new.txt")

    # EXAMPLE GET MAX AMOUNT OF WORDS IN maxWords or in ONE LINE output #
maxWords=textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords")))
textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).show(100,False)


## some interesting prompts for SQL
#
#
#x = {
#  size() counts the stuff inside
#  split(input,seperators) splits text at seperator
#  name("ColumName") rename colum
#  max()
#  min()
#  avg()
#  countDistinct()
#}
# .select([x])
# .agg(x) can be used to further use aggregations 
# .distinct(x)
# .collect() puts table into one line 
# .withColumnRenamed(from,to)

In [None]:
    ## EXAMPLE GET SINGLE WORDS FROM FILE
dfText = spark.read.text("frankenstein.txt")

df = dfText.select(split(dfText.value,"[^a-zA-Z]").alias("word"))
df = df.select(explode(col("word")).alias("word"))
df = df.select(lower(col("word")).alias("word"))
df = df.select(regexp_extract(col("word"),"[a-z]{2,}|a|i",0).alias("word"))
df.show(50,False)