Skip to content

Commit

Permalink
Fix testing for lookup and update doc for spark 2.0.x
Browse files Browse the repository at this point in the history
  • Loading branch information
FavioVazquez committed Sep 21, 2017
1 parent 3181587 commit 78d4ef6
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 12 deletions.
12 changes: 6 additions & 6 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ dataFrame:
population = [37800000,19795791,12341418,6489162]
# Dataframe:
df = sqlContext.createDataFrame(list(zip(cities, countries, population)), schema=schema)
df = op.spark.createDataFrame(list(zip(cities, countries, population)), schema=schema)
# DataFrameTransformer Instanciation:
transformer = op.DataFrameTransformer(df)
Expand Down Expand Up @@ -1126,7 +1126,7 @@ Building a dummy dataFrame:
population = [37800000,19795791,12341418,6489162]
# Dataframe:
df = sqlContext.createDataFrame(list(zip(cities, countries, population)), schema=schema)
df = op.spark.createDataFrame(list(zip(cities, countries, population)), schema=schema)
df.show()
Expand Down Expand Up @@ -1328,7 +1328,7 @@ Building a dummy dataFrame:
population = [37800000,19795791,12341418,6489162]
# Dataframe:
df = sqlContext.createDataFrame(list(zip(cities, countries, population)), schema=schema)
df = op.spark.createDataFrame(list(zip(cities, countries, population)), schema=schema)
df.show()
Expand Down Expand Up @@ -1357,7 +1357,7 @@ New DF:
transformer.show()
# Capital letters:
transformer.lookup('city', ['Caracas', 'Ccs'], 'Caracas')
transformer.lookup('city', "Caracas", ['Caracas', 'Ccs'])
# Printing new dataFrame:
print('New dataFrame:')
Expand Down Expand Up @@ -1475,7 +1475,7 @@ See the example bellow to more explanations:
# Dataframe:
df = sqlContext.createDataFrame(list(zip(id_, foods)), schema=schema)
df = op.spark.createDataFrame(list(zip(id_, foods)), schema=schema)
df.show()
Expand Down Expand Up @@ -1590,7 +1590,7 @@ date_transform(self, column, current_format, output_format)
population = [37800000,19795791,12341418,6489162]
# Dataframe:
df = sqlContext.createDataFrame(list(zip(cities, countries, population)), schema=schema)
df = op.spark.createDataFrame(list(zip(cities, countries, population)), schema=schema)
df.show()
Expand Down
1 change: 0 additions & 1 deletion optimus/df_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ def __init__(self, df):
# Dataframe
self._df = df
# SparkContext:
# self._sql_context = SQLContext(self._df.sql_ctx)
self._sql_context = self._df.sql_ctx
self._number_of_transformations = 0

Expand Down
8 changes: 4 additions & 4 deletions optimus/utilities.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# Importing os module for system operative utilities
import os
# Importing SQLContext:
# Importing SparkSession:
from pyspark.sql.session import SparkSession
# Importing module to delete folders
from shutil import rmtree
Expand All @@ -19,7 +19,7 @@
class Utilities:
def __init__(self):

# Setting SQLContext as a global variable of the class
# Setting spark as a global variable of the class
self.spark = SparkSession.builder.enableHiveSupport().getOrCreate()
# Setting SparkContent as a global variable of the class
self.__sc = self.spark.sparkContext
Expand Down Expand Up @@ -103,7 +103,7 @@ def json_load_spark_data_frame_from_url(self, data_url):

def read_dataset_parquet(self, path):
"""This function allows user to read parquet files. It is import to clarify that this method is just based
on the sqlContext.read.parquet(path) Apache Spark method. Only assertion instructions has been added to
on the spark.read.parquet(path) Apache Spark method. Only assertion instructions has been added to
ensure user has more hints about what happened when something goes wrong.
:param path Path or location of the file. Must be string dataType.
Expand Down Expand Up @@ -290,7 +290,7 @@ class Airtable:
def __init__(self, path):
# Setting airtable dataset variable
self._air_table = None
# Setting SQLContext as a global variable of the class
# Setting spark as a global variable of the class
self.spark = SparkSession()
self.sc = self.spark.sparkContext
# Reading dataset
Expand Down
2 changes: 1 addition & 1 deletion tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def test_rename_col(spark_session):
def test_lookup(spark_session):
try:
transformer = op.DataFrameTransformer(create_df(spark_session))
transformer.lookup('city', ['Caracas', 'Ccs'], 'Caracas')
transformer.lookup('city', "Caracas", ['Caracas', 'Ccs'])
assert_spark_df(transformer.get_data_frame)
except RuntimeError:
logger.exception('Could not run lookup().')
Expand Down

0 comments on commit 78d4ef6

Please sign in to comment.