In [81]:
!pip install pyspark
!pip install findspark



In [83]:
import findspark 
findspark.init()

In [84]:
import os
import pandas as pd

In [85]:
import pyspark
from pyspark import SparkConf
from pyspark.context import SparkContext

In [86]:
from pyspark.sql.session import SparkSession
import pyspark.sql.functions as psf
from pyspark.sql.functions import lit,col,sum, avg, max, first, min, mean

In [90]:
# create spark connection

def create_spark_connection(**kwargs):
    app_name = kwargs.get('app_name','SparkDemo')
    master = kwargs.get('master','yarn')
    ui_port = kwargs.get('ui_port','4040')
    driver_port = kwargs.get('driver_port','8887')
    cores_max = kwargs.get('cores_max','4')
    executor_cores = kwargs.get('executor_cores','4')
    driver_memory = kwargs.get('driver_memory','8g') 
    executor_memory = kwargs.get('executor_memory','8g')
    dynamicAllocation = kwargs.get('dynamicAllocation','false')
    aqe = kwargs.get('aqe', 'true')
    sql_shuffle_partitions = kwargs.get('sql_shuffle_partitions',200)

    
    os.environ['PYSPARK_PYTHON'] = './spark_exc_env/spark_env/bin/python' 
    os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3'

    conf = SparkConf()  
     
    conf.setAppName(app_name)
    conf.set('spark.master', master)
    
    # setting spark ports
    conf.set('spark.ui.port', ui_port)
    conf.set('spark.driver.port', driver_port)
    
    # setting spark cores
    conf.set('spark.cores.max', cores_max)
    conf.set('spark.executor.cores', executor_cores)
    
    # setting spark memory
    
    conf.set('spark.driver.memory',driver_memory)
    conf.set('spark.executor.memory', executor_memory)
    
    # setting 
    conf.set("spark.dynamicAllocation.enabled", dynamicAllocation)
    
    # setting spark sql properties
    conf.set("spark.shuffle.service.enabled", "true")
    conf.set('spark.sql.shuffle.partitions', sql_shuffle_partitions)
    conf.set('spark.sql.adaptive.coalescePartitions.enabled', aqe)
    conf.set("spark.sql.parquet.compression.codec", "gzip")

    # Use your own compressed package here: mine lies in /opt/miniconda/envs/spark_exc_env.tgz path.
    conf.set("spark.yarn.dist.archives","file:/opt/miniconda/envs/spark_exc_env.tgz#spark_exc_env")



    conf.set("spark.executor.memoryOverhead", '2g')
    conf.set("spark.shuffle.memoryFraction", '1g')

    
    # building spark session
    spark = SparkSession.builder \
    .config(conf=conf) \
    .enableHiveSupport() \
    .getOrCreate()
    
    sc = spark.sparkContext
    sc.setLogLevel('WARN')
    
    return spark, sc

def close_spark_connection(sc):
    sc.stop()

In [91]:
spark, sc = create_spark_connection(app_name='SparkDemo')
spark

22/10/08 12:12:37 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [92]:
sc.getConf().getAll()

[('spark.driver.extraJavaOptions',
  '-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED'),
 ('spark.app.startTime', '1665151036849'),
 ('spark.driver.host', 'ip-172-31-55-137.ec2.internal'),
 ('spark.driver.port', '43333'),
 ('spark.executor.id', 'driver'),
 ('spark.app.submitTime', '1665151036689'),
 ('spark.rdd.compress'

## Data Fetching

In [71]:
path = 'data.csv'
df  = spark.read.csv(path,header=True,inferSchema=True)

In [72]:

df_pandas = pd.read_csv('data.csv')

In [73]:
df.count(), df_pandas.shape

(169139, (51483, 6))

In [74]:
df

DataFrame[headline: string, category: string, date: string, views: string, article: string, link: string]

In [75]:
df.printSchema()

root
 |-- headline: string (nullable = true)
 |-- category: string (nullable = true)
 |-- date: string (nullable = true)
 |-- views: string (nullable = true)
 |-- article: string (nullable = true)
 |-- link: string (nullable = true)



In [76]:
df.show()

+--------------------+--------------------+-----------------+-----+--------------------+--------------------+
|            headline|            category|             date|views|             article|                link|
+--------------------+--------------------+-----------------+-----+--------------------+--------------------+
|የኦሊምፒክ ማጣሪያ ተሳታፊዎ...|                ስፖርት| January 14, 2021|    2|ብርሃን ፈይሳየኢትዮጵያ ቦክ...|https://www.press...|
|          አዲስ ዘመን ድሮ|                መዝናኛ|December 28, 2020|    4| የአዲስ ዘመን ጋዜጣ ቀደም...|                null|
|      መንገድ በመከተል አልፎ|                null|             null| null|                null|                null|
|      አልፎ በሚገኙት ሰፈሮች|                null|             null| null|                null|                null|
|       ብዙዎች የልኳንዳ ሥጋ|                null|             null| null|                null|                null|
|የተሰቀለባቸውና   ፉርኖ ይ...|https://www.press...|             null| null|                null|                null|
|የአረንጓዴ ጎር

In [77]:
sample_text = df.rdd.takeSample(True,1,seed=0)

In [78]:
sample_text

[Row(headline='‹የማይነጋ መስሏት…› የሚል አንድ አባባል አለ። አዎ! እንደ ጨለመ አይቀርምና በጨለማ አልታይም ብለውና ተደፋፍረው ያደረጉት ነገር ብርሃን ሲወጣ ይጋለጣል። በአባይ ግድብ ድርድር ላይ የተከሰተው ይህን መሰል ሁኔታ ነው። አሜሪካ ከዕለታት አንድ ቀን በድንገት ነሽጧት ፈላጭ ቆራጭ ለመሆን አልሞከረችም። በኢትዮጵያ ፖለቲከኞችና ጋዜጠኞች ሲደበቅላት የነበረውን ጣልቃ ገብነቷ ነው ገሃድ የወጣው። የብልጽግና ባለሥልጣናትና ጋዜጠኞቻቸው የአሜሪካን ወደ ድርድሩ መቀላቀል የወዳጅነት ጨዋታ አስመስለው ሲነግሩን ሰንብተዋል። አሜሪካ ገንዘቧን፣ የባለሥልጣናቷን ጊዜና ዕውቀት እንዲያው በከንቱ እያባከነች እንደሆነ አድርገን እንድናስብ ሲወተውቱን ከርመዋል። የአሜሪካን ከነኮተቷ ወደ ድርድሩ መግባት የተቃወሙና የተቹ ወገኖችንም ለመቆጣት፣ ለማሸማቀቅና ዕውቀት የጎደላቸው አስመስለው ለማሳየት ቃጥቷቸዋል።ነገር ግን ጉልበተኛዋ አሜሪካ፣ በተለይም ለጥቁሮች ንቀቱን በግላጭ የሚያሳየው የትራምፕ ቡድን ለብልጽግና ፓርቲ ባለሥልጣናት ብሎ መዋሸትም ሆነ ነገሩን ለረዥም ጊዜ ለማድበስበስ የሚሞክርበት ምንም ምክንያት የለውምና፣ ወደ ድርድሩ የገባበትን ጉዳይ አፍረጥርጦታል። አሜሪካ ለግብጽ ይጠቅማል ብላ የሞነጫጨረችውን ጉድ የያዘ ወረቀት ወደ ብልጽግና አለቆች በመላክ እንደ ሕጻን በቀጭን ትዕዛዝ ልታስፈርማቸው ከጅላለች። በአባይ ወንዝ ጉዳይ ያለ እሷ ይሁንታ የኢትዮጵያ መንግሥት አንዳች ነገር እንዳያደርግ ዛቻ ሰንዝራለች።ይህ ሁኔታ የብልጽግና ቡድን የዲፕሎማሲ አያያዝ ክፉኛ የዳሸቀ መሆኑን ያጋለጠ ነው። የብልጽግና ካድሬዎችና ጋዜጠኞች በዚህ ኪሳራና ውርደት ውስጥ የጠቅላይ ሚኒስትሩ መንግሥት ጠንካራ፣ አይበገሬና ለማንም የማይንበረከክ እንደሆነ አድርጎ የሚያሳይ ድራማ መሥሪያ የሚሆ