Best Practices

Best Practices 1 -> Broadcast variable on RDD

In [3]:
size=2*1000*1000
m=list(xrange(size))

In [4]:
m

In [5]:
len(m)

In [6]:
import sys
print sys.getsizeof(m) / 1000 / 1000, "Megabytes"

In [7]:
rdd=sc.parallelize([1,2,3,4,5], numSlices=5)
rdd.collect()

In [8]:
print rdd.getNumPartitions(), "partitions"

In [9]:
for i in range(5):
  rdd.map(lambda x: len(m) * x).collect()


In [10]:
broadcastVar=sc.broadcast(m)

In [11]:
for i in range(5):
  rdd.map(lambda x: len(broadcastVar.value)).collect()

Broadcast variable on dataframe

In [13]:
from pyspark.sql.functions import broadcast

In [14]:
airport_df = spark.read.csv("/mnt/lp-dataset/airports.dat").toDF("airportID", "name", "city", "country", "IATA", "ICAO", "latitude", "longitude", "altitude", "timezone", "dst", "tz","type","source")

In [15]:
airport_df.count()

In [16]:
len(airport_df.columns)

In [17]:
airport_df.printSchema()

In [18]:
airport_df.show()

In [19]:
airport_df.count()

In [20]:
airport_df.printSchema()

In [21]:
country_df = spark.read.csv("/mnt/lp-dataset/country.csv",header = True)

In [22]:
country_df.count()

In [23]:
country_df.printSchema()

In [24]:
joined_df_without_broadcast= airport_df.join(country_df, airport_df.country ==country_df.country).drop(country_df.country)

In [25]:
joined_df_without_broadcast.show()

In [26]:
joined_df_with_broadcast = airport_df.join(broadcast(country_df), airport_df.country ==country_df.country).drop(country_df.country)

In [27]:
joined_df_with_broadcast.show()

In [28]:
joined_df_with_broadcast.printSchema()

Best Practices 2 -> Level of Parallelism

In [30]:
wordList = ["cat", "cat", "fish", "dog", "fish"]

In [31]:
wordsRDD = sc.parallelize(wordList, 1)

In [32]:
wordsRDD.count()

In [33]:
wordsRDD.getNumPartitions()

In [34]:
wordsRDD.glom().collect()

In [35]:
wordsRDD = sc.parallelize(wordList, 4)

In [36]:
wordsRDD.getNumPartitions()

In [37]:
wordsRDD.glom().collect()

In [38]:
for (p, i) in wordsRDD.glom().zipWithIndex().collect():
  print('partition {} contains {}  and its length is {}'.format(i, p,len(p)))

In [39]:
rdd = sc.textFile("/mnt/lp-dataset/spark_doc.txt")
words = rdd.flatMap(lambda x: x.split(' '))
work_1 = words.map(lambda s: (s, 1))
word_count = work_1.reduceByKey(lambda x, y: x + y)

In [40]:
word_count.collect()

In [41]:
word_count.count()

In [42]:
word_count.cache()

In [43]:
word_count.count()

In [44]:
word_count.take(2)

In [45]:
work_1.cache()

In [46]:
word_count.cache()

In [47]:
word_count.count()

In [48]:
word_count.take(2)

Best Practices 3 -> Data Serialization

In [50]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("KryoSerializer apps").config("spark.serializer","org.apache.spark.serializer.KryoSerializer").getOrCreate()

#Scala
#val spark = SparkSession.builder().appName("KryoSerializer app").config("spark.serializer","org.apache.spark.serializer.KryoSerializer") .getOrCreate()


Best Practices 4 ->Data Locality

Best Practices 5 -> Filtered Data before processing

Best Practices 6 -> Use Parquet or Avro or orc file formates instead of flat files or csv

In [54]:
rawData=spark.read.csv("/mnt/lp-dataset/healthexpenditurebyareaandsource.csv",header = True)

In [55]:
rawData.show()

Parquet

In [57]:
rawData.write.parquet("/mnt/lp-dataset/healthexpend3.parquet")

In [58]:
parquet_df = spark.read.parquet("/mnt/lp-dataset/healthexpend3.parquet")

In [59]:
parquet_df.show()

In [60]:
parquet_df.createOrReplaceTempView("health_expenditur_table")

In [61]:
few_columns_data = spark.sql("SELECT financial_year, state, real_expenditure_millions FROM health_expenditur_table")

In [62]:
few_columns_data.show()

Avro

In [64]:
rawData.write.format("com.databricks.spark.avro").save("/mnt/lp-dataset/avro5")

In [65]:
avro_df = spark.read.format("com.databricks.spark.avro").load("/mnt/lp-dataset/avro5/")

In [66]:
avro_df.show()

In [67]:
avro_df.createOrReplaceTempView("health_expenditur_avro_table")

In [68]:
#Speding by state
spark.sql("select state, sum(real_expenditure_millions) SpendinginBillions from health_expenditur_avro_table group by state order by SpendinginBillions desc").show()

Parquet vs Avro

Both are highly optimized (process faster), compressed compared to flat file. They are used to improve performance. They take less space when compared to non compressed file formats.

Both Parquet, and Avro files holds the schema definition with in the files, and we register the schema definition in metadata application where we can modify whenever the schema definition changes.

Parquet is column based format. i.e. if our source contain lot of columns but we need only few columns we use parquet files. So, it is a good match to work with Spark SQL. Only when compared to avro, writing parquet is bit slower, but read process is faster than avro.

Avro is Row based format. If you want to retrieve the data in whole, we use Avro format.

It is serialization format.

It maintains metadata which is present in json file.

It is good with schema evaluation.

%md Best Practices 7 -> Using partitionBy

In [71]:
avro_df.printSchema()

In [72]:
rawData.write.format("com.databricks.spark.avro").save("/mnt/lp-dataset/avro2/",partitionBy = "financial_year")

In [73]:
avro2_df = spark.read.format("com.databricks.spark.avro").load("/mnt/lp-dataninja/avro2/")

In [74]:
avro2_df.filter(avro2_df['financial_year']=="1997-98").show()

In [75]:
avro3_df= rawData.write.format("com.databricks.spark.avro").save("/mnt/lp-dataset/avro3/",partitionBy = ["financial_year","state"])

In [76]:
avro_df.write.partitionBy("financial_year").parquet("/mnt/lp-dataset/parquet_partitionBy")