# Set Up / Initialise PySpark

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import findspark
import os
import sys

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

findspark.init()

# Create a Custom Logger class

### Methods:
- \_\_init\_\_ : lets the logger class initialize the log entry attribute
- log_commit : commits the log into a text file
- log_print : Same as the python print fuction but, also writes to log file
- log_printSchema : Same as the pyspark printSchema function but, also writes to log file
- log_show : Same as the pyspark show function but, also writes to log file


In [3]:
class Logger:
  def __init__(self):
    self.log_entry = ""
    self.f_buff = None

  def log_commit(self, log_name):
    f_name = log_name.split('.')[0]

    if f_name != self.f_buff:
      self.dup_no = 0
      self.f_buff = f_name
      f_name = f_name + '.txt'
    else:
      f_name = f_name + '_' + str(self.dup_no) + '.txt'

    f = open(f_name, 'w')
    f.write(self.log_entry)
    f.close()

    self.log_entry = ""
    self.dup_no += 1

  def log_print(self, *log_str_tuple):
    log_str = ""
    for tuple_item in log_str_tuple:
      log_str = log_str + " " + str(tuple_item)
    log_str = log_str + "\n"

    self.log_entry = self.log_entry + log_str
    sys.stdout.write(log_str)

  def log_printSchema(self, dataframe):
    log_str = dataframe._jdf.schema().treeString()
    self.log_print(log_str)

  def log_show(self, dataframe, rows = 2, truncate = True):
    if truncate:
      truncate = 10
    else:
      truncate = 0
    log_str = dataframe._jdf.showString(rows, truncate, False)
    self.log_print(log_str)

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[1]') \
                    .appName('custom_logger') \
                    .getOrCreate()

columns = ["language", "fee"]
data = [("Java", 20000), ("Python", 10000), ("Scala", 10000)]

df = spark.createDataFrame(data).toDF(*columns)

In [5]:
logger = Logger()

In [6]:
logger.log_print('This is a print statement')
logger.log_print('This', 'is', 'another')

 This is a print statement
 This is another


In [7]:
logger.log_print('===== Printing the schema of a dataframe =====')
logger.log_printSchema(df)

 ===== Printing the schema of a dataframe =====
 root
 |-- language: string (nullable = true)
 |-- fee: long (nullable = true)



In [8]:
logger.log_print('===== Printing a few rows of a dataframe =====')
logger.log_show(df)

logger.log_print('===== Printing selected rows and disabling truncation of data in a dataframe =====')
logger.log_show(df, 3, False)

 ===== Printing a few rows of a dataframe =====
 +--------+-----+
|language|  fee|
+--------+-----+
|    Java|20000|
|  Python|10000|
+--------+-----+
only showing top 2 rows

 ===== Printing selected rows and disabling truncation of data in a dataframe =====
 +--------+-----+
|language|fee  |
+--------+-----+
|Java    |20000|
|Python  |10000|
|Scala   |10000|
+--------+-----+



In [9]:
logger.log_commit('log')

In [10]:
logger.log_print('Creating a new log with the same name')
logger.log_commit('log')

 Creating a new log with the same name


In [11]:
logger.log_print('Creating a new log with a different name')
logger.log_commit('new_log')

 Creating a new log with a different name
