In [1]:
import findspark
import json
import os
findspark.init()

#Pre requisite for textblob
import nltk
nltk.download('punkt')

from textblob import TextBlob

[nltk_data] Downloading package punkt to C:\Users\Sanchit
[nltk_data]     Kumar\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [3]:
from pyspark import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql import functions

In [5]:
"""
Global variable. Inititalising Spark session.

"""
spark = SparkSession.builder.master('local').appName("Data Science Sandbox").getOrCreate()

In [7]:
"""
Class to create analytics Sandbox
"""
class Data_Science_Sandbox:
    
    config = None
    analytics_sandbox = None
    lake_path = None
    dirs = None
    
    def __init__(self):
        self.config = json.load(open("config.json"))
        self.analytics_sandbox = self.config['local_paths']['analytics_sandbox']
        self.lake_path = self.config['local_paths']['lake_path']
        self.dirs = [val[1] for val in os.walk(self.lake_path) if val[0] == self.lake_path][0]
        print(spark)
        
    def join_all(self):
        business_df = self.select_business_df()
        review_df = self.select_review_df()
        user_df = self.select_user_df()
        final_df = business_df.join(review_df , (business_df.business_id == review_df.business_id) , how = 'inner').drop(review_df.business_id)
        final_df = final_df.join(user_df , (final_df.user_id == user_df.user_id) , how = 'inner').drop(user_df.user_id)
        #final_df.show()
        self.write_to_file(final_df)
        
        
    def write_to_file(self, df):
        file_path = os.path.join(self.analytics_sandbox , 'Analytics_file')
        df.coalesce(1).write.csv(file_path , mode  = 'overwrite' , header = True)
        print("File saved at {}".format(self.analytics_sandbox))
        
    
    def select_business_df(self):
        df = spark.read.parquet(os.path.join(self.lake_path , self.dirs[0]))
        business_df = df.select(['business_id','name','categories','state','city','review_count','stars'])
        business_df = self.rename_columns(business_df , ['name','categories','stars'] , ['business_name','business_categories','business_stars'])
        return business_df
    
    def select_review_df(self):
        df = spark.read.parquet(os.path.join(self.lake_path , self.dirs[5]))
        review_df = df.select(['business_id','review_id','user_id','date','text','stars'])
        review_df = self.rename_columns(review_df , ['date','stars'] , ['review_date','review_stars'])
        review_df.show()
        #return review_df
    
    def select_user_df(self):
        df = spark.read.parquet(os.path.join(self.lake_path , self.dirs[7]))
        user_df = df.select(['user_id','name','review_count'])
        user_df = self.rename_columns(user_df , ['name','review_count'], ['user_name','user_review_count'])
        return user_df
     
    '''
    ###     Utillity to drop columns from dataframe    ###
    def drop_columns(self, df , columns_to_drop = []):
        if columns_to_drop == [] :
            print("No column to drop")
            return df
        else:
            cols = ''.join(["'" + str(name)+"'," for name in columns_to_drop])[:-1]
            df = df.drop(cols)
            return df
        
    '''   
    ###     Utility to rename column names    ####
        
    def rename_columns(self, df , old_col_names, new_col_names ):
        if len(old_col_names) != len(new_col_names):
            print("Old column name list and new column name list must have equal number of elements.\nProcess failed.\nNo column renamed.")
            return df
        else:
            for old_name, new_name in zip(old_col_names , new_col_names):
                df = df.withColumnRenamed(old_name , new_name)
            return df
            

In [8]:
if __name__ == "__main__":
    dw = Data_Science_Sandbox()
    dw.select_review_df()
    #dw.join_all()
    #dw.select_business_df()

IndexError: list index out of range

In [9]:
abc = "C://Intel/"
print("File at location {}hello world{}".format(abc , 5))

File at location C://Intel/hello world5
