Skip to content

Commit

Permalink
Fix docstring and explode_table function (renamed to count_items), fi…
Browse files Browse the repository at this point in the history
…x bug in analyzer
  • Loading branch information
FavioVazquez committed Sep 24, 2017
1 parent 7c63258 commit 4cbcb80
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 40 deletions.
26 changes: 12 additions & 14 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1448,7 +1448,7 @@ New dataFrame:
| Spain| ~Madrid| 6489162|
+---------+---------------+----------+

DataFrameTransformer.explode_table(coldId, col, new_col_feature)
DataFrameTransformer.count_items(col_id, col_search, new_col_feature, search_string):
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

This function can be used to split a feature with some extra information
Expand All @@ -1467,7 +1467,7 @@ See the example bellow to more explanations:
# Building a simple dataframe:
schema = StructType([
StructField("bill id", IntegerType(), True),
StructField("bill_id", IntegerType(), True),
StructField("foods", StringType(), True)])
id_ = [1, 2, 2, 3, 3, 3, 3, 4, 4]
Expand Down Expand Up @@ -1513,7 +1513,7 @@ New DF:
transformer.show()
# Transformation:
transformer.explode_table('bill id', 'foods', 'Beer')
transformer.count_items(col_id="bill_id",col_search="foods",new_col_feature="beer_count",search_string="Beer")
# Printing new dataFrame:
print('New dataFrame:')
Expand Down Expand Up @@ -1545,17 +1545,15 @@ Original dataFrame:

New dataFrame:

+-------+---------+----+
|bill id| foods|Beer|
+-------+---------+----+
| 1| Pizza| 0|
+-------+---------+----+
| 2| Pizza| 1|
+-------+---------+----+
| 3|Hamburger| 3|
+-------+---------+----+
| 4| Pizza| 1|
+-------+---------+----+
+-------+----------+
|bill_id|beer_count|
+-------+----------+
| 3| 3|
+-------+----------+
| 4| 1|
+-------+----------+
| 2| 1|
+-------+----------+

DataFrameTransformer.date_transform(column, current_format, output_format)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
2 changes: 1 addition & 1 deletion optimus/df_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def __init__(self, df, path_file=None, pu=0.1, seed=13):
assert isinstance(path_file, (str, type(None))), \
"Error, path_file argument must be string datatype or NoneType."
# Asserting if path includes the type of filesystem
if isinstance(path_file, type(None)):
if not isinstance(path_file, type(None)):
assert (("file:///" == path_file[0:8]) or ("hdfs:///" == path_file[0:8])), \
"Error: path must be with a 'file://' prefix \
if the file is in the local disk or a 'path://' \
Expand Down
43 changes: 18 additions & 25 deletions optimus/df_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,29 +682,30 @@ def move_col(self, column, ref_col, position):

return self

def explode_table(self, col_id, col1, new_col_feature, list_to_assign):
def count_items(self, col_id, col_search, new_col_feature, search_string):
"""
This function can be used to split a feature with some extra information in order
to make a new column feature.
:param col_id column name of the columnId of dataFrame
:param col1 column name of the column to be split.
:param col_search column name of the column to be split.
:param new_col_feature Name of the new column.
:param list_to_assign List of values to be counted.
:param search_string string of value to be counted.
Please, see documentation for more explanations about this method.
"""
# Asserting if position is string or list:

assert isinstance(list_to_assign, list), "Error: list_to_assign argument must be a list"
assert isinstance(search_string, str), "Error: search_string argument must be a string"

# Asserting parameters are not empty strings:
assert (
(col_id != '') and (col1 != '') and (new_col_feature != '')), "Error: Input parameters can't be empty strings"
(col_id != '') and (col_search != '') and (new_col_feature != '')), \
"Error: Input parameters can't be empty strings"

# Check if col1 argument is string datatype:
self._assert_type_str(col1, "col1")
# Check if col_search argument is string datatype:
self._assert_type_str(col_search, "col_search")

# Check if new_col_feature argument is a string datatype:
self._assert_type_str(new_col_feature, "new_col_feature")
Expand All @@ -715,36 +716,28 @@ def explode_table(self, col_id, col1, new_col_feature, list_to_assign):
# Check if col_id to be process are in dataframe
self._assert_cols_in_df(columns_provided=[col_id], columns_df=self._df.columns)

# Check if col1 to be process are in dataframe
self._assert_cols_in_df(columns_provided=[col1], columns_df=self._df.columns)
# Check if col_search to be process are in dataframe
self._assert_cols_in_df(columns_provided=[col_search], columns_df=self._df.columns)

# subset, only PAQ and Tipo_Unidad:
subdf = self._df.select(col_id, col1)

# dataframe Filtered:
df_mod = self._df.where(self._df[col1] != new_col_feature)
subdf = self._df.select(col_id, col_search)

# subset de
new_column = subdf.where(subdf[col1] == new_col_feature).groupBy(col_id).count()
new_column = subdf.where(subdf[col_search] == search_string).groupBy(col_id).count()

# Left join:
new_column = new_column.withColumnRenamed(col_id, col_id + '_other')

for x, _ in enumerate(list_to_assign):
if x == 0:
exprs = (df_mod[col_id] == new_column[col_id + '_other']) & (df_mod[col1] == list_to_assign[x])
else:
exprs = exprs | (df_mod[col_id] == new_column[col_id + '_other']) & (df_mod[col1] == list_to_assign[x])
exprs = (subdf[col_id] == new_column[col_id + '_other']) & (subdf[col_search] == search_string)

df_mod = df_mod.join(new_column, exprs, 'left_outer')
df_mod = subdf.join(new_column, exprs, 'left_outer')

# Cleaning dataframe:
df_mod = df_mod.drop(col_id + '_other').na.fill(0).withColumnRenamed('count', new_col_feature)
self._df = df_mod
df_mod = df_mod.drop(col_id + '_other').drop(col_search).withColumnRenamed('count', new_col_feature)\
.dropna("any")

self._add_transformation() # checkpoint in case

return self
print("Counting existing "+search_string + " in "+col_search)
return df_mod.sort(col_id).drop_duplicates([col_id])

def date_transform(self, columns, current_format, output_format):
"""
Expand Down

0 comments on commit 4cbcb80

Please sign in to comment.