Skip to content

Commit

Permalink
Extract removing auto-pk column into one function
Browse files Browse the repository at this point in the history
  • Loading branch information
calherries committed Dec 13, 2023
1 parent 9621040 commit ffe0385
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 187 deletions.
234 changes: 99 additions & 135 deletions src/metabase/upload.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
[clj-bom.core :as bom]
[clojure.data :as data]
[clojure.data.csv :as csv]
[clojure.java.io :as io]
[clojure.string :as str]
[flatland.ordered.map :as ordered-map]
[flatland.ordered.set :as ordered-set]
Expand Down Expand Up @@ -218,54 +217,24 @@
"_mb_row_id")

(defn- get-auto-pk-column [table-id]
(some (fn [field]
(when (= (normalize-column-name (:name field)) auto-pk-column-name)
field))
(t2/select :model/Field :table_id table-id)))
(first (filter (fn [field]
(= (normalize-column-name (:name field)) auto-pk-column-name))
(t2/select :model/Field :table_id table-id :active true))))

(defn- remove-indices
"Removes the elements at the given indices from the collection. Indices is a set."
[indices coll]
(keep-indexed (fn [idx item]
(when-not (contains? indices idx)
item))
coll))

(defn- indices-where
"Returns a lazy seq of the indices where the predicate is true."
[pred coll]
(keep-indexed (fn [idx item]
(when (pred item)
idx))
coll))
(defn- detect-schema
"Consumes the header and rows from a CSV file.
(defn- parse-rows*
"Returns a lazy seq of parsed rows, given a sequence of upload types for each column.
Replaces empty strings with nil."
[col-upload-types rows]
(let [settings (upload-parsing/get-settings)
parsers (map #(upload-parsing/upload-type->parser % settings) col-upload-types)]
(for [row rows]
(for [[value parser] (map-with-nils vector row parsers)]
(when (not (str/blank? value))
(parser value))))))
Returns a map with two keys:
- `:extant-columns`: an ordered map of columns found in the CSV file, excluding columns that have the same normalized name as the generated columns.
- `:generated-columns`: an ordered map of columns we are generating ourselves. Currently, this is just the auto-incrementing PK.
(defn- rows->schema
"Rows should be a lazy-seq. This function hides the logic for ignoring any columns in the CSV that have the same
normalized name as the auto-pk-column-name."
The value of `extant-columns` and `generated-columns` is an ordered map of normalized-column-name -> type for the
given CSV file. Supported types include `::int`, `::datetime`, etc. A column that is completely blank is assumed to
be of type `::text`."
[header rows]
(let [normalized-header (->> header
(map normalize-column-name))
;; remove columns from the rows with the same normalized name as the auto-pk-column-name
auto-pk-col-indices (set (indices-where #(= auto-pk-column-name %) normalized-header))
;; this function removes the auto-pk columns from each row for parsing and type detection
remove-auto-pk-cols (fn [row]
(remove-indices auto-pk-col-indices row))
rows (cond->> rows
(seq auto-pk-col-indices)
(map remove-auto-pk-cols))
unique-header (->> normalized-header
remove-auto-pk-cols
mbql.u/uniquify-names
(map keyword))
column-count (count normalized-header)
Expand All @@ -276,13 +245,7 @@
(map #(or % ::text))
(map vector unique-header))]
{:extant-columns (ordered-map/ordered-map col-name+type-pairs)
:generated-columns (ordered-map/ordered-map (keyword auto-pk-column-name) ::auto-incrementing-int-pk)
:parse-rows (fn [rows']
(let [rows' (cond->> rows'
(seq auto-pk-col-indices)
(map remove-auto-pk-cols))
col-types (map second col-name+type-pairs)]
(parse-rows* col-types rows')))}))
:generated-columns (ordered-map/ordered-map (keyword auto-pk-column-name) ::auto-incrementing-int-pk)}))


;;;; +------------------+
Expand Down Expand Up @@ -315,23 +278,6 @@
[driver col->upload-type]
(update-vals col->upload-type (partial driver/upload-type->database-type driver)))

(defn- detect-schema
"Consumes a CSV file that *must* have headers as the first row.
Returns a map with three keys:
- `:extant-columns`: an ordered map of columns found in the CSV file, excluding columns that have the same normalized name as the generated columns.
- `:generated-columns`: an ordered map of columns we are generating ourselves. Currently, this is just the auto-incrementing PK.
- `:parse-rows`: a function that takes a single lazy-seq of rows, and returns a lazy-seq of rows. It parses the
string values in each row and removes columns that have the same normalized name as the generated columns.
The value of `extant-columns` and `generated-columns` is an ordered map of normalized-column-name -> type for the
given CSV file. Supported types include `::int`, `::datetime`, etc. A column that is completely blank is assumed to
be of type ::text."
[csv-file]
(with-open [reader (bom/bom-reader csv-file)]
(let [[header & rows] (csv/read-csv reader)]
(rows->schema header (sample-rows rows)))))

(defn current-database
"The database being used for uploads (as per the `uploads-database-id` setting)."
[]
Expand All @@ -347,29 +293,69 @@
name
(str schema "." name)))

(defn- parse-rows
"Returns a lazy seq of parsed rows, given a sequence of upload types for each column.
Replaces empty strings with nil."
[col-upload-types rows]
(let [settings (upload-parsing/get-settings)
parsers (map #(upload-parsing/upload-type->parser % settings) col-upload-types)]
(for [row rows]
(for [[value parser] (map-with-nils vector row parsers)]
(when (not (str/blank? value))
(parser value))))))

(defn- remove-indices
"Removes the elements at the given indices from the collection. Indices is a set."
[indices coll]
(keep-indexed (fn [idx item]
(when-not (contains? indices idx)
item))
coll))

(defn- indices-where
"Returns a lazy seq of the indices where the predicate is true."
[pred coll]
(keep-indexed (fn [idx item]
(when (pred item)
idx))
coll))

(defn- auto-pk-column-indices
"Returns the indices of columns that have the same normalized name as [[auto-pk-column-name]]"
[header]
(set (indices-where #(= auto-pk-column-name (normalize-column-name %)) header)))

(defn- rows-without-auto-pk-columns
[rows]
(let [header (first rows)
auto-pk-indices (auto-pk-column-indices header)]
(cond->> rows
auto-pk-indices
(map (partial remove-indices auto-pk-indices)))))

(defn- load-from-csv!
"Loads a table from a CSV file. If the table already exists, it will throw an error.
Returns the file size, number of rows, and number of columns."
[driver db-id table-name ^File csv-file]
(let [{:keys [extant-columns generated-columns parse-rows]} (detect-schema csv-file)
cols->upload-type (merge generated-columns extant-columns)
col-to-create->col-spec (upload-type->col-specs driver cols->upload-type)
csv-col-names (keys extant-columns)]
(driver/create-table! driver db-id table-name col-to-create->col-spec)
(try
(with-open [reader (io/reader csv-file)]
(let [rows (->> (csv/read-csv reader)
(drop 1) ; drop header
parse-rows)]
(driver/insert-into! driver db-id table-name csv-col-names rows)
{:num-rows (count rows)
:num-columns (count extant-columns)
:generated-columns (count generated-columns)
:size-mb (/ (.length csv-file)
1048576.0)}))
(catch Throwable e
(driver/drop-table! driver db-id table-name)
(throw (ex-info (ex-message e) {:status-code 400}))))))
(with-open [reader (bom/bom-reader csv-file)]
(let [[header & rows] (rows-without-auto-pk-columns (csv/read-csv reader))
{:keys [extant-columns generated-columns]} (detect-schema header (sample-rows rows))
cols->upload-type (merge generated-columns extant-columns)
col-to-create->col-spec (upload-type->col-specs driver cols->upload-type)
csv-col-names (keys extant-columns)
col-upload-types (vals extant-columns)
parsed-rows (vec (parse-rows col-upload-types rows))]
(driver/create-table! driver db-id table-name col-to-create->col-spec)
(try
(driver/insert-into! driver db-id table-name csv-col-names parsed-rows)
{:num-rows (count rows)
:num-columns (count extant-columns)
:generated-columns (count generated-columns)
:size-mb (/ (.length csv-file)
1048576.0)}
(catch Throwable e
(driver/drop-table! driver db-id table-name)
(throw (ex-info (ex-message e) {:status-code 400})))))))

;;;; +------------------+
;;;; | Create upload
Expand Down Expand Up @@ -539,76 +525,54 @@
:type/Text ::text))

(defn- check-schema
"Returns a map with three keys:
- `parse-rows`, a function that takes a lazy-seq of rows, and returns a lazy-seq of rows. It parses the
string values in each row and removes columns that have the same normalized name as the generated columns.
- `table-col-names`, a seq of the column names in the table, in the same order as the columns in the CSV file. Excludes the auto-pk column.
- `table-has-auto-pk?`, a boolean indicating whether the table already contains the auto-pk column.
Throws an exception if:
"Throws an exception if:
- the CSV file contains duplicate column names
- the schema of the CSV file does not match the schema of the table"
[table header]
[fields-by-normed-name header]
;; Assumes table-cols are unique when normalized
(let [fields (t2/select :model/Field :table_id (:id table))
fields-by-normed-name (m/index-by (comp normalize-column-name :name)
fields)
normalized-field-names (keys fields-by-normed-name)
auto-pk-col-indices (set (indices-where #(= auto-pk-column-name (normalize-column-name %)) header))
;; this function removes the auto-pk columns from each row and the header
remove-auto-pk-cols (fn [row]
(remove-indices auto-pk-col-indices row))
(let [normalized-field-names (keys fields-by-normed-name)
;; TODO: use this to create nice error messages when there are extra or missing columns
normalized-header+header (->> header
(map (fn [col]
[(normalize-column-name col) col]))
remove-auto-pk-cols)
[(normalize-column-name col) col])))
normalized-header (map first normalized-header+header)
[extra missing _both] (data/diff (set normalized-header)
;; ignore the auto-pk field, it will be filled automatically
(disj (set normalized-field-names) auto-pk-column-name))]
[extra missing _both] (data/diff (set normalized-header) (set normalized-field-names))]
;; check for duplicates
(when (some #(< 1 %) (vals (frequencies normalized-header)))
(throw (ex-info (tru "The CSV file contains duplicate column names.")
{:status-code 422})))
(if (and (empty? extra) (empty? missing))
{:parse-rows (fn [rows]
(let [rows' (cond->> rows
(seq auto-pk-col-indices)
(map remove-auto-pk-cols))
col-upload-types (map (comp base-type->upload-type :base_type fields-by-normed-name)
normalized-header)]
(parse-rows* col-upload-types rows')))
:table-col-names (map (comp :name fields-by-normed-name) normalized-header)
:table-has-auto-pk? (contains? fields-by-normed-name auto-pk-column-name)}
(when (or extra missing)
(throw (ex-info (tru "The schema of the CSV file does not match the schema of the table.")
{:status-code 422})))))

(defn- append-csv!*
[database table file]
(with-open [reader (bom/bom-reader file)]
(let [[header & rows] (csv/read-csv reader)
{:keys [parse-rows table-col-names table-has-auto-pk?]} (check-schema table header)
driver (driver.u/database->driver database)
auto-pk-type (driver/upload-type->database-type driver ::auto-incrementing-int-pk)
create-auto-pk? (not table-has-auto-pk?)]
(let [[header & rows] (rows-without-auto-pk-columns (csv/read-csv reader))
driver (driver.u/database->driver database)
fields-by-normed-name (m/index-by (comp normalize-column-name :name)
(t2/select :model/Field :table_id (:id table) :active true))
normed-header (map normalize-column-name header)
create-auto-pk? (not (contains? fields-by-normed-name auto-pk-column-name))
_ (check-schema (dissoc fields-by-normed-name auto-pk-column-name) header)
col-upload-types (map (comp base-type->upload-type :base_type fields-by-normed-name) normed-header)
parsed-rows (parse-rows col-upload-types rows)]
(try
(let [parsed-rows (parse-rows rows)]
(when create-auto-pk?
(driver/add-columns! driver
(:id database)
(table-identifier table)
{:_mb_row_id auto-pk-type}))
(driver/insert-into! driver
(when create-auto-pk?
(driver/add-columns! driver
(:id database)
(table-identifier table)
table-col-names
parsed-rows)
(scan-and-sync-table! database table)
(when create-auto-pk?
(let [auto-pk-field (get-auto-pk-column (:id table))]
(t2/update! :model/Field (:id auto-pk-field) {:display_name (:name auto-pk-field)})))
{:row-count (count parsed-rows)})
{:_mb_row_id (driver/upload-type->database-type driver ::auto-incrementing-int-pk)}))
(driver/insert-into! driver
(:id database)
(table-identifier table)
normed-header
parsed-rows)
(scan-and-sync-table! database table)
(when create-auto-pk?
(let [auto-pk-field (get-auto-pk-column (:id table))]
(t2/update! :model/Field (:id auto-pk-field) {:display_name (:name auto-pk-field)})))
{:row-count (count parsed-rows)}
(catch Throwable e
;; TODO: insert rows failure handling
(throw (ex-info (ex-message e) {:status-code 400})))))))
Expand Down

0 comments on commit ffe0385

Please sign in to comment.