In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("MyApp") \
    .config("spark.driver.host", "localhost") \
    .getOrCreate()

default_conf = spark.sparkContext._conf.getAll()
print(default_conf)

conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '4g'),
                                        ('spark.app.name', 'Spark Updated Conf'),
                                        ('spark.executor.cores', '4'),
                                        ('spark.cores.max', '4'),
                                        ('spark.driver.memory','4g')])

spark.sparkContext.stop()

spark = SparkSession \
    .builder \
    .appName("MyApp") \
    .config(conf=conf) \
    .getOrCreate()


default_conf = spark.sparkContext._conf.get("spark.cores.max")
print("updated configs " , default_conf)


conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '4g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'), ('spark.driver.memory','4g')])

docker cp wordcount.txt master:/root/
docker cp basicsparksubmit.py master:/root/

docker exec master /opt/hadoop/bin/hdfs dfs -put /root/wordcount.txt /
docker exec master /opt/hadoop/bin/hdfs dfs -put /root/basicsparksubmit.py /
docker exec master /opt/hadoop/bin/hdfs dfs -ls /

In [19]:
%%writefile step1_preprocess.py
# %load 4_MongoDB/step1_preprocess.py
## Load Packages

import pyspark
from pyspark.sql import SparkSession  
from pyspark.sql.types import *
from pyspark.sql.functions import *

### Configure spark session
spark = SparkSession\
   .builder\
   .master('spark://master:7077')\
   .appName('quake_etl')\
   .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1')\
   .getOrCreate()

#spark.sparkContext._conf.getAll()

# Load the dataset 
df_load = spark.read.csv('hdfs://master:9000/database.csv', header=True)

# Drop fields we don't need from df_load
lst_dropped_columns = ['Depth Error', 'Time', 'Depth Seismic Stations','Magnitude Error','Magnitude Seismic Stations','Azimuthal Gap', 'Horizontal Distance','Horizontal Error',
    'Root Mean Square','Source','Location Source','Magnitude Source','Status']
df_load = df_load.drop(*lst_dropped_columns)

# Create a year field and add it to the dataframe
df_load = df_load.withColumn('Year', year(to_timestamp('Date', 'dd/MM/yyyy')))

# Build the quakes frequency dataframe using the year field and counts for each year
df_quake_freq = df_load.groupBy('Year').count().withColumnRenamed('count', 'Counts')

# Cast some fields from string into numeric types
df_load = df_load.withColumn('Latitude', df_load['Latitude'].cast(DoubleType()))\
    .withColumn('Longitude', df_load['Longitude'].cast(DoubleType()))\
    .withColumn('Depth', df_load['Depth'].cast(DoubleType()))\
    .withColumn('Magnitude', df_load['Magnitude'].cast(DoubleType()))

# Create avg magnitude and max magnitude fields and add to df_quake_freq
df_max = df_load.groupBy('Year').max('Magnitude').withColumnRenamed('max(Magnitude)', 'Max_Magnitude')
df_avg = df_load.groupBy('Year').avg('Magnitude').withColumnRenamed('avg(Magnitude)', 'Avg_Magnitude')

# Join df_max, and df_avg to df_quake_freq
df_quake_freq = df_quake_freq.join(df_avg, ['Year']).join(df_max, ['Year'])

# Remove nulls
df_load.dropna()
df_quake_freq.dropna()

# Build the tables/collections in mongodb
# Write df_load to mongodb
df_load.write.format('mongo')\
    .mode('overwrite')\
    .option('spark.mongodb.output.uri', 'mongodb://root:go2team@mongo:27017/Quake.quakes?authSource=admin').save()

# Write df_quake_freq to mongodb
df_quake_freq.write.format('mongo')\
    .mode('overwrite')\
    .option('spark.mongodb.output.uri', 'mongodb://root:go2team@mongo:27017/Quake.quake_freq?authSource=admin').save()

spark.stop()


Writing step1_preprocess.py


In [11]:
%%writefile basicsparksubmit.py

from pyspark import SparkContext, SparkConf
conf = (SparkConf().setMaster("spark://master:7077")
    .set("spark.executor.cores", "1")
    .set("spark.cores.max", "2")
    .set('spark.executor.memory', '1g')
)

sc = SparkContext(conf=conf)

logFilepath = "hdfs://master:9000/wordcount.txt"  
logData = sc.textFile(logFilepath).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print("Lines with a: %i, lines with b: %i" % (numAs, numBs))


Overwriting basicsparksubmit.py


In [None]:
%%writefile basicsparksubmit.py

from pyspark.sql import SparkSession

conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '4g'),
                                        ('spark.app.name', 'Spark Updated Conf'),
                                        ('spark.executor.cores', '2'),
                                        ('spark.cores.max', '2'),
                                        ('spark.driver.memory','4g')])

spark = SparkSession \
    .builder \
    .appName("MyApp") \
    .config(conf=conf) \
    .getOrCreate()


In [12]:
! docker cp basicsparksubmit.py master:/root/

In [13]:
! docker exec master /opt/hadoop/bin/hdfs dfs -rm /basicsparksubmit.py

Deleted /basicsparksubmit.py


In [14]:
! docker exec master /opt/hadoop/bin/hdfs dfs -put /root/basicsparksubmit.py /

In [16]:
! docker exec master /opt/hadoop/bin/hdfs dfs -cat /basicsparksubmit.py


from pyspark import SparkContext, SparkConf
conf = (SparkConf().setMaster("spark://master:7077")
    .set("spark.executor.cores", "1")
    .set("spark.cores.max", "2")
    .set('spark.executor.memory', '1g')
)

sc = SparkContext(conf=conf)

logFilepath = "hdfs://master:9000/wordcount.txt"  
logData = sc.textFile(logFilepath).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print("Lines with a: %i, lines with b: %i" % (numAs, numBs))


In [17]:
%%writefile ../../dags/dag_basicsparksubmit.py
import airflow
from datetime import timedelta
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator 
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',    
    #'start_date': airflow.utils.dates.days_ago(2),
    # 'end_date': datetime(),
    # 'depends_on_past': False,
    # 'email': ['airflow@example.com'],
    # 'email_on_failure': False,
    #'email_on_retry': False,
    # If a task fails, retry it once after waiting
    # at least 5 minutes
    #'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag_spark = DAG(
        dag_id = "sparkSubmitOperator",
        default_args=default_args,
        # schedule_interval='0 0 * * *',
        schedule_interval='@once',
        dagrun_timeout=timedelta(minutes=60),
        description='use case of sparkoperator in airflow',
        start_date = airflow.utils.dates.days_ago(1)
)

spark_submit_local = SparkSubmitOperator(
    application ='hdfs://master:9000/basicsparksubmit.py' ,
    conn_id= 'spark_default', 
    task_id='spark_submit_task', 
    dag=dag_spark
    )

spark_submit_local

if __name__ == "__main__":
    dag_spark.cli()

Overwriting dags/dag_basicsparksubmit.py
