In [1]:
import os

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [15]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [16]:
!pip install pyspark=='3.1.1'

Collecting pyspark==3.1.1
  Using cached pyspark-3.1.1-py2.py3-none-any.whl
Collecting py4j==0.10.9 (from pyspark==3.1.1)
  Using cached py4j-0.10.9-py2.py3-none-any.whl (198 kB)
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9.5
    Uninstalling py4j-0.10.9.5:
      Successfully uninstalled py4j-0.10.9.5
  Attempting uninstall: pyspark
    Found existing installation: pyspark 3.3.2
    Uninstalling pyspark-3.3.2:
      Successfully uninstalled pyspark-3.3.2
Successfully installed py4j-0.10.9 pyspark-3.1.1


In [1]:
import pyspark

In [2]:
import findspark

In [3]:
findspark.init()

In [4]:
from pyspark.sql import SparkSession

In [5]:
if __name__=="__main__":
  spark = SparkSession.builder \
      .appName("myapplication") \
      .master("local[*]") \
      .getOrCreate()


In [6]:
spark

In [7]:
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,DecimalType,FloatType,DateType

In [8]:
tnx_schema=StructType([StructField("tnx_id",IntegerType()),
                       StructField("tnx_dt",DateType()),
                       StructField("cid",IntegerType()),
                       StructField("amount",DecimalType(10,2)),
                       StructField("prod_cat",StringType()),
                       StructField("prod",StringType()),
                       StructField("city",StringType()),
                       StructField("state",StringType()),
                       StructField("mode",StringType())])

In [9]:
tnx_df=spark.read\
           .option("header",True)\
           .schema(tnx_schema)\
           .option("dateFormat","MM-dd-yyyy")\
           .option("mode","PERMISSIVE")\
           .csv("/content/txns_with_header_alter.csv")
#in PERMISSIVE MODE ALL the data will be present except the data
#that is wrongly formed will be with present as null
#here amount field is 98.a04 which is not decimal type rest all are decimal type
# so 98.a04 is discarded in place of it null is represented in amount first field
#only one field will be changed.

In [10]:
tnx_df.printSchema()

root
 |-- tnx_id: integer (nullable = true)
 |-- tnx_dt: date (nullable = true)
 |-- cid: integer (nullable = true)
 |-- amount: decimal(10,2) (nullable = true)
 |-- prod_cat: string (nullable = true)
 |-- prod: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- mode: string (nullable = true)



In [11]:
tnx_df.show()

+------+----------+-------+------+--------------------+--------------------+--------------+--------------+------+
|tnx_id|    tnx_dt|    cid|amount|            prod_cat|                prod|          city|         state|  mode|
+------+----------+-------+------+--------------------+--------------------+--------------+--------------+------+
|     0|2011-06-26|4007024|  null|  Exercise & Fitness|Cardio Machine Ac...|   Clarksville|     Tennessee|credit|
|     1|2011-05-26|4006742|198.44|  Exercise & Fitness|Weightlifting Gloves|    Long Beach|    California|credit|
|     2|2011-06-01|4009775|  5.58|  Exercise & Fitness|Weightlifting Mac...|       Anaheim|    California|credit|
|     3|2011-06-05|4002199|198.19|          Gymnastics|    Gymnastics Rings|     Milwaukee|     Wisconsin|credit|
|     4|2011-12-17|4002613| 98.81|         Team Sports|        Field Hockey|   Nashville  |     Tennessee|credit|
|     5|2011-02-14|4007591|193.63|  Outdoor Recreation|Camping & Backpac...|       Chica

In [12]:
tnx_df=spark.read\
          .option("header",True)\
          .schema(tnx_schema)\
          .option("mode","DROPMALFORMED")\
          .option("dateFormat",'MM-dd-yyyy')\
          .csv("/content/txns_with_header_alter.csv")
#DROPMALFORMED REMOVES THE ENTIRE ROW WHICH HAS atleast one
#field record as mismatch datatype here entire oth record is
#discarded.

In [13]:
tnx_df.show()

+------+----------+-------+------+--------------------+--------------------+--------------+--------------+------+
|tnx_id|    tnx_dt|    cid|amount|            prod_cat|                prod|          city|         state|  mode|
+------+----------+-------+------+--------------------+--------------------+--------------+--------------+------+
|     1|2011-05-26|4006742|198.44|  Exercise & Fitness|Weightlifting Gloves|    Long Beach|    California|credit|
|     2|2011-06-01|4009775|  5.58|  Exercise & Fitness|Weightlifting Mac...|       Anaheim|    California|credit|
|     3|2011-06-05|4002199|198.19|          Gymnastics|    Gymnastics Rings|     Milwaukee|     Wisconsin|credit|
|     4|2011-12-17|4002613| 98.81|         Team Sports|        Field Hockey|   Nashville  |     Tennessee|credit|
|     5|2011-02-14|4007591|193.63|  Outdoor Recreation|Camping & Backpac...|       Chicago|      Illinois|credit|
|     6|2011-10-28|4002190| 27.89|             Puzzles|      Jigsaw Puzzles|    Charlest

In [14]:
tnx_df=spark.read\
          .option("header",True)\
          .schema(tnx_schema)\
          .option("mode","FAILFAST")\
          .option("dateFormat",'MM-dd-yyyy')\
          .csv("/content/txns_with_header_alter.csv")
#in FAILFAST IT GIVES ERROR AND DOEST PROCEED FORWARD
#IF THERE IS ATLEAST ONE FIELD IS MISMATCHED DATATYPE
#Caused by: java.lang.NumberFormatException

In [15]:
tnx_df.show()

Py4JJavaError: ignored

In [34]:
from pyspark.sql.functions import col

In [58]:
tnx_schema1=StructType([StructField("tnx_id",IntegerType()),
                       StructField("tnx_dt",DateType()),
                       StructField("cid",IntegerType()),
                       StructField("amount",DecimalType(10,2)),
                       StructField("prod_cat",StringType()),
                       StructField("prod",StringType()),
                       StructField("city",StringType()),
                       StructField("state",StringType()),
                       StructField("mode",StringType()),
                       StructField("_corrupt_records",StringType())])

In [59]:
tnx_df2=spark.read\
          .option("header",True)\
          .schema(tnx_schema1)\
          .option("mode","PERMISSIVE")\
          .option("dateFormat",'MM-dd-yyyy')\
          .option("columnNameOfCorruptRecord","_corrupt_records")\
          .csv("/content/txns_with_header_alter.csv")



In [60]:
tnx_df2.show(truncate=False)

+------+----------+-------+------+----------------------+---------------------------------+--------------+--------------+------+--------------------------------------------------------------------------------------------------------------+
|tnx_id|tnx_dt    |cid    |amount|prod_cat              |prod                             |city          |state         |mode  |_corrupt_records                                                                                              |
+------+----------+-------+------+----------------------+---------------------------------+--------------+--------------+------+--------------------------------------------------------------------------------------------------------------+
|0     |2011-06-26|4007024|null  |Exercise & Fitness    |Cardio Machine Accessories       |Clarksville   |Tennessee     |credit|00000000,06-26-2011,4007024,04a0.33,Exercise & Fitness,Cardio Machine Accessories,Clarksville,Tennessee,credit|
|1     |2011-05-26|4006742|198.44|Exerci

In [61]:
tnx_df2.where("_corrupt_records is not null").show()

Py4JJavaError: ignored