In [7]:
import luigi
import time
from zipfile import ZipFile
import urllib
from tempfile import mktemp
import os
import numpy as np
import pandas as pd
from IPython.display import display
import requests
class DownloadFoodReviewsData(luigi.Task):
def run(self):
    # URL for the file download.
    gzip_file_url = "https://snap.stanford.edu/data/finefoods.txt.gz"

    # Paths for the data directory, text file, and compressed file.
    data_directory = os.path.join('..', 'data')
    if not os.path.exists(data_directory):
        print("Data directory ('{}') does not exist - creating it".format(data_directory))
        os.mkdir(data_directory)

    text_file_path = os.path.join(data_directory, 'finefoods.txt')
    gzip_file_path = text_file_path + '.gz'

    # Function to download a file from a website.
    #
    # Assumes that the file is large and should be downloaded as a stream.
    #
    # url - URL to the file
    # save_file_path - local path where the file should be saved
    # chunk_size - size of the chunks to use when saving the streamed bytes to a file; default = 8KiB
    #
    # returns - None
    def download_data_file(url, save_file_path, chunk_size=8192):
        # Request the file as a stream.
        response = requests.get(url, stream=True)

        # Save the data to the given file_path.
        with open(save_file_path, 'wb') as fd:
            for chunk in response.iter_content(chunk_size=chunk_size):
                fd.write(chunk)

        return None
    # Function to decompress a gzipped data file.
    #
    # text_file - path to the decompressed file; will be created
    # gzip_file - path to the gzipped file; must already exist
    #
    # returns - None
    def decompress_data_file(text_file, gzip_file):
        with gzip.open(gzip_file, 'rb') as infile:
            #gz_content = infile.read()
            #text_content = str(gz_content)
            with open(text_file, 'wb') as outfile:
                outfile.write(infile.read())

        return None

    # Function to get the decompressed data file.
    #
    # Does nothing if the data file already exists.
    # If the data file doesn't exist, but the compressed data file does, then it decompresses the compressed file.
    # If the compressed data file doesn't exist, then it downloads and decompresses the data file.
    #
    # text_file - path to the decompressed file; will be created if it doesn't already exist
    # gzip_file - path to the gzipped file; will be downloaded if it doesn't already exist
    #
    # returns - None
    def get_data_file(text_file, gzip_file):
        if os.path.exists(text_file):
            print("Decompressed file ('{}') already exists".format(text_file))
        else:
            print("Decompressed file ('{}') does not exist".format(text_file))			
        # To get the decompressed file, we need the gzipped file.
        if not os.path.exists(gzip_file):
            print("Compressed data file ('{}') does not exist, downloading...".format(gzip_file))
            download_data_file(gzip_file_url, gzip_file)
            print("...download finished")

            print("Compressed data file ('{}') exists, decompressing".format(gzip_file))
            decompress_data_file(text_file, gzip_file)			
        return None

    def output(self):	
        #save file to Data directory
        return luigi.LocalTarget('data/finefoods.txt')	



class ConvertFoodReviewsData(luigi.Task):    
def requires(self):
return DownloadFoodReviewsData()

def input(self):
return luigi.LocalTarget('../data/finefoods.txt')
def run(self):
# Paths for the data directory, text file, and compressed file.
data_directory = os.path.join('..', 'data')
if not os.path.exists(data_directory):
    print("[ERROR] data directory ('{}') does not exist".format(data_directory))

INPUT_FILE_NAME = "finefoods.txt"
OUTPUT_FILE_NAME = "finefoods.csv"

input_filepath = os.path.join(data_directory, INPUT_FILE_NAME)
csv_filepath = os.path.join(data_directory, OUTPUT_FILE_NAME)

header = [
    "product/productId",
    "review/userId",
    "review/profileName",
    "review/helpfulness",
    "review/score",
    "review/time",
    "review/summary",
    "review/text"]

simple_header = [
    "productId",
    "userId",
    "profileName",
    "helpfulness",
    "score",
    "time",
    "summary",
    "text"]

infile = open(input_filepath, "rt", encoding="Latin-1")
csvfile = open(csv_filepath, "wt", encoding="UTF-8")


# Write a list of fields as a comma-separated row with a pipe (|) as a quote character.
def write_quoted_fields(csvfile, field_list):
    csvfile.write('|' + field_list[0] + '|')
    for field in field_list[1:]:
        csvfile.write(',|' + field + '|')
    csvfile.write("\n")


# Write the header line.
write_quoted_fields(csvfile, simple_header)

# Useful controls during debugging.
record_limit = 1000000
troublesome_records = [] #370, 211557, 226163, 519217, 521382, 525958, 531539

field_count = len(header)
line_count = 0
record_count = 0
currentLine = []
for line in infile:
    #print("Processing line: {}".format(line.strip()))
    line_count += 1
    line = line.strip()

    if line == "":
        if len(currentLine) == field_count:
            write_quoted_fields(csvfile, currentLine)
            record_count += 1

            if (record_count+1) in troublesome_records:
                print("[WARN] troublesome record -1: {}".format(currentLine))

            if record_count in troublesome_records:
                print("[WARN] troublesome record: {}".format(currentLine))

            if (record_count-1) in troublesome_records:
                print("[WARN] troublesome record +1: {}".format(currentLine))

        else:
            print("[WARN] current record appears to be incomplete: {}".format(currentLine))

        currentLine = []
        continue

    parts = line.split(": ", 1)

    # Check to see if the line looks sensible enough to be added.
    if len(parts) == 2:
        # If there are pipe characters in the text (unlikely), replace them with a slash.
        field = parts[1].strip().replace('|', '/')
        currentLine.append(field)
    else:
        # Throw this away - there are junk lines in the raw file, e.g.:
        # review/profileName: Sherry "Tell us about yourself!
        # School Princi...
        print("[WARN] only found {} parts after splitting line: {}".format(len(parts), parts))
        print("[WARN] line was: {}".format(line))

    if record_count > record_limit:
        break


# Write the final record (if it is complete).
if len(currentLine) == field_count:
    write_quoted_fields(csvfile, currentLine)
    record_count += 1

# Close files.
infile.close()
csvfile.close()

print("Finished - wrote {} lines and {} records.".format(line_count, record_count))


column_dtypes = {'productId': str, 'userId': str, 'profileName': str, 'helpfulness': str,
                 'score': np.float64, 'time': np.int64, 'summary': str, 'text': str}

# For this dataset, 'quoting' must be set to QUOTE_ALL (1) and the quotechar to a pipe (|).
# The problem is that values in some 'text' fields begin with a ", but don't end with one,
# and many review texts contain commas, unbalanced quotes and apostrophes.
review_df = pd.read_table(csv_filepath, delimiter=',', encoding="UTF-8", dtype=column_dtypes, 
                          quoting=1, quotechar='|', engine="c", skip_blank_lines=True, 
                          error_bad_lines=False, warn_bad_lines=True)
# Convert score to an int, since it isn't truly a float.
review_df['score'] = review_df['score'].astype(int)

review_df['helpfulness_numerator'] = [x[0] for x in review_df['helpfulness'].str.split('/')]
review_df['helpfulness_denominator'] = [x[1] for x in review_df['helpfulness'].str.split('/')]
review_df['date'] = pd.to_datetime(review_df['time'], unit='s')
del review_df['helpfulness']
review_df.to_csv("data/Reviews.csv",index=False)
def output(self):
#save file to Data directory
return luigi.LocalTarget('data/Reviews.csv')

if __name__ == '__main__':
luigi.run()			


IndentationError: expected an indented block (<ipython-input-7-56759690efa7>, line 12)