# ANALYSIS OF RETURNS DATA USING RDD

In [1]:
sc = SparkContext.getOrCreate()

In [2]:
sc.version

u'2.1.0'

Using RDD APIs, we will explore the following steps:

    Reading text files
    Parse records and associate with a schema
    Filtering of records
    Grouping of records
    Sorting by key and values
    Applying map and reduce functions
    Caching RDDs
    Applying groupByKey() and mapValues() functions
    Collecting data into driver
    Joining RDDs
    Saving RDDs into files
    Using Broadcast variables
    Using Accumulators


In [3]:
sc.pythonVer

'2.7'

In [30]:
returns_list = sc.textFile("file:///home/vijay/BPF/data/RETURN1.csv")

In [31]:
returns_list.take(2)

[u'6797,01/07/2016,56390275,1,2,10,8.33,RED WINE AUSTRALIAN,W42JE,GONE OFF',
 u'6797,01/07/2016,56390275,1,2,10,8.33,W42GBBM2 RED AUSTRALIA,W42GB,GONE OFF']

In [32]:
#returns_list = returns_list.map(lambda x: x.encode("ascii", "ignore").split(','))

In [33]:
returns_list.take(25)

[u'6797,01/07/2016,56390275,1,2,10,8.33,RED WINE AUSTRALIAN,W42JE,GONE OFF',
 u'6797,01/07/2016,56390275,1,2,10,8.33,W42GBBM2 RED AUSTRALIA,W42GB,GONE OFF',
 u'6797,01/07/2016,56390275,1,2,10,8.33,BMI RED AUSTRALIA,W42DB,GONE OFF',
 u'6797,01/07/2016,56390275,1,2,10,8.33,BM2 RED AUSTRALIA,W42GB,GONE OFF',
 u'2800,02/07/2016,73939529,3,1,9,7.5,CHAMPAGNE,W47DA,BROKEN INSIDE',
 u'3063,02/07/2016,53489550,3,2,10,8.33,W42GBBM2 RED AUSTRALIA,W42GB,BROKEN INSIDE',
 u'3063,02/07/2016,53489550,3,2,10,8.33,BMI RED AUSTRALIA,W42DB,BROKEN INSIDE',
 u'3063,02/07/2016,53489550,3,2,10,8.33,BM2 RED AUSTRALIA,W42GB,BROKEN INSIDE',
 u'3063,02/07/2016,53489550,3,2,10,8.33,RED WINE AUSTRALIAN,W42JE,BROKEN INSIDE',
 u'3063,02/07/2016,51926365,3,1,5.5,4.58,WHITE WINE AUSTRALIAN,W43JE,BROKEN INSIDE',
 u'3063,02/07/2016,51926365,3,1,5.5,4.58,W43GBBM2 WHITE AUSTRALIA,W43GB,BROKEN INSIDE',
 u'3063,02/07/2016,51926365,3,1,5.5,4.58,BM2 WHITE AUSTRALIA,W43GB,BROKEN INSIDE',
 u'3429,02/07/2016,81091764,3,1,6,5,WHIT

In [34]:
supplier_list = sc.textFile("file:///home/vijay/BPF/data/SUPPLIER1.csv" )

In [35]:
supplier_list.take(2)

[u' ARGENTINA#0A,2800,63896749', u' ARGENTINA#0A,2800,68024093']

In [36]:
type(supplier_list.take(2))

list

In [37]:
type(supplier_list)

pyspark.rdd.RDD

In [38]:
returns_list.count()

4514

In [39]:
returns_fields = ['LOCATION','CALENDAR_DATE','BASE_PRODUCT_NUMBER','TXN_TYPE','RETURN_SNGL','RETURN_VALUE','RETURN_VALUE_XVAT','PRODUCT_SUB_GROUP_DESCRIPTION','PRODUCT_SUB_GROUP_CODE','REASON'
]

In [40]:
from collections import namedtuple

In [41]:
returns_rows = namedtuple('returns_rows',returns_fields)

In [42]:
returns_rows

collections.returns_rows

In [43]:
from datetime import datetime

In [44]:
datetime.strptime("24052010", "%d%m%Y").date()

datetime.date(2010, 5, 24)

In [86]:
# Define a function to parse each line and convert them into records
def parseRec(line):
    columns = line.split(",")
    return returns_rows(int(columns[0]),str(columns[1]),int(columns[2]),int(columns[3]),int(columns[4]),float(columns[5]),float(columns[6]),columns[7],
                          columns[8],columns[9])

In [87]:
returns_rdd = returns_list.map(lambda rec: parseRec(rec))

In [88]:
returns_list.take(2)

[u'6797,01/07/2016,56390275,1,2,10,8.33,RED WINE AUSTRALIAN,W42JE,GONE OFF',
 u'6797,01/07/2016,56390275,1,2,10,8.33,W42GBBM2 RED AUSTRALIA,W42GB,GONE OFF']

In [89]:
returns_rdd.take(5)

[returns_rows(LOCATION=6797, CALENDAR_DATE='01/07/2016', BASE_PRODUCT_NUMBER=56390275, TXN_TYPE=1, RETURN_SNGL=2, RETURN_VALUE=10.0, RETURN_VALUE_XVAT=8.33, PRODUCT_SUB_GROUP_DESCRIPTION=u'RED WINE AUSTRALIAN', PRODUCT_SUB_GROUP_CODE=u'W42JE', REASON=u'GONE OFF'),
 returns_rows(LOCATION=6797, CALENDAR_DATE='01/07/2016', BASE_PRODUCT_NUMBER=56390275, TXN_TYPE=1, RETURN_SNGL=2, RETURN_VALUE=10.0, RETURN_VALUE_XVAT=8.33, PRODUCT_SUB_GROUP_DESCRIPTION=u'W42GBBM2 RED AUSTRALIA', PRODUCT_SUB_GROUP_CODE=u'W42GB', REASON=u'GONE OFF'),
 returns_rows(LOCATION=6797, CALENDAR_DATE='01/07/2016', BASE_PRODUCT_NUMBER=56390275, TXN_TYPE=1, RETURN_SNGL=2, RETURN_VALUE=10.0, RETURN_VALUE_XVAT=8.33, PRODUCT_SUB_GROUP_DESCRIPTION=u'BMI RED AUSTRALIA', PRODUCT_SUB_GROUP_CODE=u'W42DB', REASON=u'GONE OFF'),
 returns_rows(LOCATION=6797, CALENDAR_DATE='01/07/2016', BASE_PRODUCT_NUMBER=56390275, TXN_TYPE=1, RETURN_SNGL=2, RETURN_VALUE=10.0, RETURN_VALUE_XVAT=8.33, PRODUCT_SUB_GROUP_DESCRIPTION=u'BM2 RED AUSTRAL

In [162]:
#help(namedtuple) #similar to structs

In [50]:
type(returns_rdd)

pyspark.rdd.PipelinedRDD

In [51]:
ret_10_OrMore_rdd = returns_rdd.filter(lambda rec: rec.RETURN_SNGL > 40)

In [52]:
ret_10_OrMore_rdd.count()

5

In [53]:
ret_10_OrMore_rdd.take(6)

[returns_rows(LOCATION=u'2800', CALENDAR_DATE=u'11/07/2016', BASE_PRODUCT_NUMBER=u'66557421', TXN_TYPE=u'3', RETURN_SNGL=45, RETURN_VALUE=247.5, RETURN_VALUE_XVAT=206.25, PRODUCT_SUB_GROUP_DESCRIPTION=u'W45ADAB ROSE', PRODUCT_SUB_GROUP_CODE=u'W45AD', REASON=u'BROKEN INSIDE'),
 returns_rows(LOCATION=u'2800', CALENDAR_DATE=u'11/07/2016', BASE_PRODUCT_NUMBER=u'66557421', TXN_TYPE=u'3', RETURN_SNGL=45, RETURN_VALUE=247.5, RETURN_VALUE_XVAT=206.25, PRODUCT_SUB_GROUP_DESCRIPTION=u'ROSE WINE', PRODUCT_SUB_GROUP_CODE=u'W45AE', REASON=u'BROKEN INSIDE'),
 returns_rows(LOCATION=u'2800', CALENDAR_DATE=u'11/07/2016', BASE_PRODUCT_NUMBER=u'66557421', TXN_TYPE=u'3', RETURN_SNGL=45, RETURN_VALUE=247.5, RETURN_VALUE_XVAT=206.25, PRODUCT_SUB_GROUP_DESCRIPTION=u'AB ROSE', PRODUCT_SUB_GROUP_CODE=u'W45AD', REASON=u'BROKEN INSIDE'),
 returns_rows(LOCATION=u'2800', CALENDAR_DATE=u'11/07/2016', BASE_PRODUCT_NUMBER=u'66557421', TXN_TYPE=u'3', RETURN_SNGL=45, RETURN_VALUE=247.5, RETURN_VALUE_XVAT=206.25, PRODUC

In [220]:
x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect()))

[('a', [1, 1]), ('b', [1])]

In [221]:
x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
#def fun(x): return x

In [222]:
x.flatMapValues(f).collect()

[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]

In [223]:
w = sc.parallelize([("a", 5), ("b", 6)])
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
z = sc.parallelize([("b", 42)])
map((lambda (x,y): (x, (list(y[0]), list(y[1]), list(y[2]), list(y[3])))), 
    sorted(list(w.groupWith(x, y, z).collect())))

[('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))]

In [224]:
rdd = sc.parallelize([1, 2, 3, 4], 2)

In [228]:
rdd.collect()

[1, 2, 3, 4]

In [225]:
def funsum(iterator): yield sum(iterator)

In [227]:
rdd.mapPartitions(funsum).collect()

[3, 7]

In [229]:
rdd.getNumPartitions()

2

In [230]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.filter(lambda x: x % 2 == 0).collect()

[2, 4]

In [231]:
sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())

[1, 2, 3]

In [234]:
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
result = rdd.groupBy(lambda x: x + x).collect()
sorted([(x, sorted(y)) for (x, y) in result])

[(2, [1, 1]), (4, [2]), (6, [3]), (10, [5]), (16, [8])]

In [235]:
#https://spark.apache.org/docs/1.1.1/api/python/pyspark.rdd.RDD-class.html#sum

RDD Creation

In this section, we will introduce two different ways of getting data into the basic Spark data structure, the Resilient Distributed Dataset or RDD. An RDD is a distributed collection of elements. All work in Spark is expressed as either creating new RDDs, transforming existing RDDs, or calling actions on RDDs to compute a result.Spark automatically distributes the data contained in RDDs across your cluster and parallelizes the operations you perform on them.

In [236]:
# RDD Basic Operations

This section will introduce three basic but essential Spark operations.
Two of them are the transformations map and filter. The other is the action collect.
At the same time we will introduce the concept of persistence in Spark

In [29]:
ret_10_OrMore_rdd = returns_rdd.filter(lambda rec: rec.RETURN_SNGL > 40)

NameError: name 'returns_rdd' is not defined

In [240]:
from time import time
t0 = time()
normal_count = ret_10_OrMore_rdd.count()
tt = time() - t0
print "There are {} 'more than 40 returned singles".format(normal_count)
print "Count completed in {} seconds".format(round(tt,3))

There are 5 'more than 40 returned singles
Count completed in 0.329 seconds


In [241]:
#By using the map transformation in Spark, we can apply a function to every element in our RDD

In [249]:
from pprint import pprint
csv_data = returns_list.map(lambda x: x.split(","))
t0 = time()
head_rows = csv_data.take(1)
tt = time() - t0
print "Parse completed in {} seconds".format(round(tt,3))
pprint(head_rows[0])

Parse completed in 0.123 seconds
[u'6797',
 u'01/07/2016',
 u'56390275',
 u'1',
 u'2',
 u'10',
 u'8.33',
 u'RED WINE AUSTRALIAN',
 u'W42JE',
 u'GONE OFF']


In [288]:
#Using map with Predefined Functions
def parse_Line(line):
    elem = line.split(",")
    tag = elem[2]  # these are useful when doing count
    return (tag,elem)

In [289]:
key_csv_data = returns_list.map(parse_Line)
head_rows = key_csv_data.take(1)
pprint(head_rows[0])

(u'56390275',
 [u'6797',
  u'01/07/2016',
  u'56390275',
  u'1',
  u'2',
  u'10',
  u'8.33',
  u'RED WINE AUSTRALIAN',
  u'W42JE',
  u'GONE OFF'])


In [304]:
head_rows

[(u'56390275',
  [u'6797',
   u'01/07/2016',
   u'56390275',
   u'1',
   u'2',
   u'10',
   u'8.33',
   u'RED WINE AUSTRALIAN',
   u'W42JE',
   u'GONE OFF'])]

 The collect Action

So far we have used the actions count and take. Another basic action we need to learn is collect.
Basically it will get all the elements in the RDD into memory for us to work with them. For this 
reason it has to be used with care, specially when working with large RDDs

In [305]:
t0 = time()
all_raw_data = returns_list.collect()
tt = time() - t0
print "Data collected in {} seconds".format(round(tt,3))

Data collected in 0.201 seconds


In [306]:
t0 = time()
all_raw_data = returns_list.take(50000)
tt = time() - t0
print "Data collected in {} seconds".format(round(tt,3))

Data collected in 0.21 seconds


# CONVERT RDD TO DF  USING STRUCT FIELDS

In [1]:
import sys
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, HiveContext
import os
from pyspark.sql.functions import *
import pyspark.storagelevel 
import numpy as np
import pandas as pd
import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pdb
import subprocess # Used for executing linux commands, for writing to teradata
import sys

from pyspark.sql.types import *
from pyspark.sql.functions import col, udf, min, max
from datetime import datetime, timedelta

sqlContext = SQLContext(sc)
sqlContext = HiveContext(sc)

In [4]:
returns_list = sc.textFile("file:///home/vijay/BPF/data/RETURN1.csv")

In [14]:
returns_list.take(3)

[u'6797,01/07/2016,56390275,1,2,10,8.33,RED WINE AUSTRALIAN,W42JE,GONE OFF',
 u'6797,01/07/2016,56390275,1,2,10,8.33,W42GBBM2 RED AUSTRALIA,W42GB,GONE OFF',
 u'6797,01/07/2016,56390275,1,2,10,8.33,BMI RED AUSTRALIA,W42DB,GONE OFF']

In [162]:
nws_schema = StructType([StructField("LOCATION",IntegerType(),True),
                         StructField("CALENDAR_DATE",DateType(),True),
                         StructField("BASE_PRODUCT_NUMBER",IntegerType(),True),
                         StructField("TXN_TYPE",DoubleType(),True),
                         StructField("RETURN_SNGL",DoubleType(),True),
                         StructField("RETURN_VALUE",DoubleType(),True),
                         StructField("RETURN_VALUE_XVAT",DoubleType(),True),
                         StructField("PRODUCT_SUB_GROUP_DESCRIPTION",StringType(),True),
                         StructField("PRODUCT_SUB_GROUP_CODE",StringType(),True),                                                                    
                         StructField("REASON",StringType(),True)])


In [163]:
returns_df = sqlContext.createDataFrame(returns_rdd,nws_schema)

In [164]:
returns_df.show(2)

+--------+-------------+-------------------+--------+-----------+------------+-----------------+-----------------------------+----------------------+--------+
|LOCATION|CALENDAR_DATE|BASE_PRODUCT_NUMBER|TXN_TYPE|RETURN_SNGL|RETURN_VALUE|RETURN_VALUE_XVAT|PRODUCT_SUB_GROUP_DESCRIPTION|PRODUCT_SUB_GROUP_CODE|  REASON|
+--------+-------------+-------------------+--------+-----------+------------+-----------------+-----------------------------+----------------------+--------+
|    6797|   2016-01-07|           56390275|     1.0|        2.0|        10.0|             8.33|          RED WINE AUSTRALIAN|                 W42JE|GONE OFF|
|    6797|   2016-01-07|           56390275|     1.0|        2.0|        10.0|             8.33|         W42GBBM2 RED AUST...|                 W42GB|GONE OFF|
+--------+-------------+-------------------+--------+-----------+------------+-----------------+-----------------------------+----------------------+--------+
only showing top 2 rows



In [183]:
returns_rdd.take(3)

[returns_rows(LOCATION=6797, CALENDAR_DATE=datetime.datetime(2016, 1, 7, 0, 0), BASE_PRODUCT_NUMBER=56390275, TXN_TYPE=1.0, RETURN_SNGL=2.0, RETURN_VALUE=10.0, RETURN_VALUE_XVAT=8.33, PRODUCT_SUB_GROUP_DESCRIPTION='RED WINE AUSTRALIAN', PRODUCT_SUB_GROUP_CODE='W42JE', REASON='GONE OFF'),
 returns_rows(LOCATION=6797, CALENDAR_DATE=datetime.datetime(2016, 1, 7, 0, 0), BASE_PRODUCT_NUMBER=56390275, TXN_TYPE=1.0, RETURN_SNGL=2.0, RETURN_VALUE=10.0, RETURN_VALUE_XVAT=8.33, PRODUCT_SUB_GROUP_DESCRIPTION='W42GBBM2 RED AUSTRALIA', PRODUCT_SUB_GROUP_CODE='W42GB', REASON='GONE OFF'),
 returns_rows(LOCATION=6797, CALENDAR_DATE=datetime.datetime(2016, 1, 7, 0, 0), BASE_PRODUCT_NUMBER=56390275, TXN_TYPE=1.0, RETURN_SNGL=2.0, RETURN_VALUE=10.0, RETURN_VALUE_XVAT=8.33, PRODUCT_SUB_GROUP_DESCRIPTION='BMI RED AUSTRALIA', PRODUCT_SUB_GROUP_CODE='W42DB', REASON='GONE OFF')]

In [130]:
import datetime
# Define a function to parse each line and convert them into records
def parseRec(line):
    columns = line.split(",")
    return returns_rows(int(columns[0]),str(columns[1]),int(columns[2]),int(columns[3]),int(columns[4]),float(columns[5]),float(columns[6]),columns[7],
                          columns[8],columns[9])

In [131]:
returns_rdd = returns_list.map(lambda rec: parseRec(rec))

In [132]:
returns_rdd.take(2)

[returns_rows(LOCATION=6797, CALENDAR_DATE='01/07/2016', BASE_PRODUCT_NUMBER=56390275, TXN_TYPE=1, RETURN_SNGL=2, RETURN_VALUE=10.0, RETURN_VALUE_XVAT=8.33, PRODUCT_SUB_GROUP_DESCRIPTION=u'RED WINE AUSTRALIAN', PRODUCT_SUB_GROUP_CODE=u'W42JE', REASON=u'GONE OFF'),
 returns_rows(LOCATION=6797, CALENDAR_DATE='01/07/2016', BASE_PRODUCT_NUMBER=56390275, TXN_TYPE=1, RETURN_SNGL=2, RETURN_VALUE=10.0, RETURN_VALUE_XVAT=8.33, PRODUCT_SUB_GROUP_DESCRIPTION=u'W42GBBM2 RED AUSTRALIA', PRODUCT_SUB_GROUP_CODE=u'W42GB', REASON=u'GONE OFF')]

# TAKE 2 : bringing all that works together
1) Parsing method returns data or RDD of specific type values

2) Used structType to enforce schema and create df


THIS WOULD BE VERY EASY WITH DATABRICKS CSV READER

In [14]:
import sys
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, HiveContext
import os
from pyspark.sql.functions import *
import pyspark.storagelevel 
import numpy as np
import pandas as pd
import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pdb
import subprocess # Used for executing linux commands, for writing to teradata
import sys

from pyspark.sql.types import *
from pyspark.sql.functions import col, udf, min, max
from datetime import datetime, timedelta

sqlContext = SQLContext(sc)
sqlContext = HiveContext(sc)

In [15]:
returns_rdd1 = sc.textFile("file:///home/vijay/BPF/data/RETURN1.csv")
returns_rdd1.count()

4514

In [16]:
returns_rdd1.take(2)

[u'6797,01/07/2016,56390275,1,2,10,8.33,RED WINE AUSTRALIAN,W42JE,GONE OFF',
 u'6797,01/07/2016,56390275,1,2,10,8.33,W42GBBM2 RED AUSTRALIA,W42GB,GONE OFF']

In [17]:
from collections import namedtuple
returns_fields = ['LOCATION','CALENDAR_DATE','BASE_PRODUCT_NUMBER','TXN_TYPE','RETURN_SNGL','RETURN_VALUE','RETURN_VALUE_XVAT','PRODUCT_SUB_GROUP_DESCRIPTION','PRODUCT_SUB_GROUP_CODE','REASON'
]
returns_rows = namedtuple('returns_rows',returns_fields)
returns_rows #can access field names on this

collections.returns_rows

In [18]:
import datetime
from datetime import datetime
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DateType
# Define a function to parse each line and convert them into records
def parseRec(line):
    columns = line.split(",")
    return returns_rows(int(columns[0]),datetime.strptime((columns[1]), '%m/%d/%Y'),int(columns[2]),float(columns[3]),float(columns[4]),float(columns[5]),float(columns[6]),str(columns[7]),
                          str(columns[8]),str(columns[9]))

In [19]:
returns_rdd = returns_rdd1.map(lambda rec: parseRec(rec))

In [20]:
returns_rdd.take(2)

[returns_rows(LOCATION=6797, CALENDAR_DATE=datetime.datetime(2016, 1, 7, 0, 0), BASE_PRODUCT_NUMBER=56390275, TXN_TYPE=1.0, RETURN_SNGL=2.0, RETURN_VALUE=10.0, RETURN_VALUE_XVAT=8.33, PRODUCT_SUB_GROUP_DESCRIPTION='RED WINE AUSTRALIAN', PRODUCT_SUB_GROUP_CODE='W42JE', REASON='GONE OFF'),
 returns_rows(LOCATION=6797, CALENDAR_DATE=datetime.datetime(2016, 1, 7, 0, 0), BASE_PRODUCT_NUMBER=56390275, TXN_TYPE=1.0, RETURN_SNGL=2.0, RETURN_VALUE=10.0, RETURN_VALUE_XVAT=8.33, PRODUCT_SUB_GROUP_DESCRIPTION='W42GBBM2 RED AUSTRALIA', PRODUCT_SUB_GROUP_CODE='W42GB', REASON='GONE OFF')]

In [21]:
ret_schema = StructType([StructField("LOCATION",IntegerType(),True),
                         StructField("CALENDAR_DATE",DateType(),True),
                         StructField("BASE_PRODUCT_NUMBER",IntegerType(),True),
                         StructField("TXN_TYPE",DoubleType(),True),
                         StructField("RETURN_SNGL",DoubleType(),True),
                         StructField("RETURN_VALUE",DoubleType(),True),
                         StructField("RETURN_VALUE_XVAT",DoubleType(),True),
                         StructField("PRODUCT_SUB_GROUP_DESCRIPTION",StringType(),True),
                         StructField("PRODUCT_SUB_GROUP_CODE",StringType(),True),                                                                    
                         StructField("REASON",StringType(),True)])

# Create a data frame

In [22]:
returns_df = sqlContext.createDataFrame(returns_rdd,ret_schema)

In [23]:
returns_df.show(3)

+--------+-------------+-------------------+--------+-----------+------------+-----------------+-----------------------------+----------------------+--------+
|LOCATION|CALENDAR_DATE|BASE_PRODUCT_NUMBER|TXN_TYPE|RETURN_SNGL|RETURN_VALUE|RETURN_VALUE_XVAT|PRODUCT_SUB_GROUP_DESCRIPTION|PRODUCT_SUB_GROUP_CODE|  REASON|
+--------+-------------+-------------------+--------+-----------+------------+-----------------+-----------------------------+----------------------+--------+
|    6797|   2016-01-07|           56390275|     1.0|        2.0|        10.0|             8.33|          RED WINE AUSTRALIAN|                 W42JE|GONE OFF|
|    6797|   2016-01-07|           56390275|     1.0|        2.0|        10.0|             8.33|         W42GBBM2 RED AUST...|                 W42GB|GONE OFF|
|    6797|   2016-01-07|           56390275|     1.0|        2.0|        10.0|             8.33|            BMI RED AUSTRALIA|                 W42DB|GONE OFF|
+--------+-------------+-------------------+--

In [25]:
returns_df.printSchema()

root
 |-- LOCATION: integer (nullable = true)
 |-- CALENDAR_DATE: date (nullable = true)
 |-- BASE_PRODUCT_NUMBER: integer (nullable = true)
 |-- TXN_TYPE: double (nullable = true)
 |-- RETURN_SNGL: double (nullable = true)
 |-- RETURN_VALUE: double (nullable = true)
 |-- RETURN_VALUE_XVAT: double (nullable = true)
 |-- PRODUCT_SUB_GROUP_DESCRIPTION: string (nullable = true)
 |-- PRODUCT_SUB_GROUP_CODE: string (nullable = true)
 |-- REASON: string (nullable = true)

