Showing with 181 additions and 77 deletions.
  1. +4 −3 bin/run_migrations.sh
  2. +11 −5 common-pg/src/main/scala/com/socrata/pg/soql/SqlFunctions.scala
  3. +3 −3 common-pg/src/main/scala/com/socrata/pg/store/RollupManager.scala
  4. +7 −4 common-pg/src/main/scala/com/socrata/pg/store/SoQLCommon.scala
  5. +3 −0 common-pg/src/main/scala/com/socrata/pg/store/index/Indexable.scala
  6. +13 −6 common-pg/src/main/scala/com/socrata/soql/analyzer/SoQLAnalyzerHelper.scala
  7. +11 −3 common-pg/src/test/scala/com/socrata/pg/soql/SqlizerTest.scala
  8. +1 −1 common-pg/src/test/scala/com/socrata/pg/store/PGSecondaryUniverseTestBase.scala
  9. +2 −2 project/Dependencies.scala
  10. +4 −2 soql-server-pg/src/main/scala/com/socrata/pg/server/CJSONWriter.scala
  11. +11 −6 soql-server-pg/src/main/scala/com/socrata/pg/server/QueryServer.scala
  12. +2 −1 soql-server-pg/src/test/resources/fixtures/group-case.json
  13. +1 −1 soql-server-pg/src/test/resources/fixtures/group-floatingtimestamp-trunc-y.json
  14. +2 −2 soql-server-pg/src/test/resources/fixtures/group-text-expr.json
  15. +9 −5 soql-server-pg/src/test/resources/fixtures/group-text-order-count.json
  16. +5 −1 soql-server-pg/src/test/resources/fixtures/group-text.json
  17. +3 −1 soql-server-pg/src/test/resources/fixtures/mutate-create.json
  18. +1 −0 soql-server-pg/src/test/resources/fixtures/result.json
  19. +9 −0 soql-server-pg/src/test/resources/fixtures/select-is-empty.json
  20. +1 −0 soql-server-pg/src/test/resources/fixtures/select-signed_magnitude_10.json
  21. +15 −14 soql-server-pg/src/test/resources/fixtures/select-signed_magnitude_linear.json
  22. +2 −0 soql-server-pg/src/test/resources/fixtures/where-conv-txt2blob.json
  23. +12 −1 soql-server-pg/src/test/resources/fixtures/where-null-isnull.json
  24. +1 −0 soql-server-pg/src/test/resources/fixtures/where-str-ne.json
  25. +3 −2 soql-server-pg/src/test/scala/com/socrata/pg/server/PGQueryServerDatabaseTestBase.scala
  26. +2 −1 soql-server-pg/src/test/scala/com/socrata/pg/server/QueryTest.scala
  27. +4 −0 soql-server-pg/src/test/scala/com/socrata/pg/server/SoQLConversionFunctionsTest.scala
  28. +24 −2 soql-server-pg/src/test/scala/com/socrata/pg/server/SoQLGeomFunctionsTest.scala
  29. +2 −2 soql-server-pg/src/test/scala/com/socrata/pg/server/SoQLGroupTest.scala
  30. +3 −3 soql-server-pg/src/test/scala/com/socrata/pg/server/SoQLLimitOffsetTest.scala
  31. +4 −0 soql-server-pg/src/test/scala/com/socrata/pg/server/SoQLMathFunctionsTest.scala
  32. +1 −1 store-pg/docker/Dockerfile
  33. +1 −1 store-pg/src/test/scala/com/socrata/pg/store/RollupDropTest.scala
  34. +3 −3 store-pg/src/test/scala/com/socrata/pg/store/RollupTest.scala
  35. +1 −1 version.sbt
@@ -2,14 +2,15 @@
BASEDIR=$(dirname "$0")/..
CONFIG=${SODA_CONFIG:-$BASEDIR/../docs/onramp/services/soda2.conf}

JARS=( $BASEDIR/store-pg/target/scala-*/store-pg-assembly-*.jar )
JARS=( $BASEDIR/store-pg/target/scala-2.10/store-pg-assembly-*.jar )
# shellcheck disable=SC2012
JARFILE=$(ls -t "${JARS[@]}" | head -n 1)

if [ ! -e "$JARFILE" ]; then
cd "$BASEDIR" && sbt assembly
JARS=( $BASEDIR/store-pg/target/scala-*/store-pg-assembly-*.jar )
JARS=( $BASEDIR/store-pg/target/scala-2.10/store-pg-assembly-*.jar )
# shellcheck disable=SC2012
JARFILE=$(ls -t "${JARS[@]}" | head -n 1)
fi
java -Dconfig.file="$CONFIG" -cp "$JARFILE" com.socrata.pg.store.MigrateSchema Migrate
ARGS=(Migrate)
java -Dconfig.file="$CONFIG" -cp "$JARFILE" com.socrata.pg.store.MigrateSchema "${ARGS[@]}"
@@ -1,20 +1,20 @@
package com.socrata.pg.soql

import scala.util.parsing.input.NoPosition

import com.socrata.datacoordinator.id.UserColumnId
import com.socrata.datacoordinator.truth.sql.SqlColumnRep
import com.socrata.soql.functions._
import com.socrata.soql.typed.{CoreExpr, FunctionCall, NumberLiteral, StringLiteral}
import com.socrata.soql.types.SoQLID.{StringRep => SoQLIDRep}
import com.socrata.soql.types.SoQLVersion.{StringRep => SoQLVersionRep}
import com.socrata.soql.types._
import Sqlizer._

import scala.util.parsing.input.NoPosition
import Sqlizer._
import SoQLFunctions._

// scalastyle:off magic.number multiple.string.literals
object SqlFunctions {
import SoQLFunctions._ // scalastyle:ignore import.grouping

type FunCall = FunctionCall[UserColumnId, SoQLType]

type FunCallToSql =
@@ -54,6 +54,7 @@ object SqlFunctions {
Simplify -> formatSimplify("ST_Simplify(%s, %s)") _,
SimplifyPreserveTopology -> formatSimplify("ST_SimplifyPreserveTopology(%s, %s)") _,
SnapToGrid -> formatSimplify("ST_SnapToGrid(%s, %s)") _,
IsEmpty -> isEmpty,
VisibleAt -> visibleAt,
Between -> formatCall("%s between %s and %s") _,
Lt -> infix("<") _,
@@ -76,7 +77,8 @@ object SqlFunctions {
UnaryPlus -> passthrough,
UnaryMinus -> formatCall("-%s") _,
SignedMagnitude10 -> formatCall("sign(%s) * length(floor(abs(%s))::text)", Some(Seq(0,0))),
SignedMagnitudeLinear -> formatCall("sign(%s) * floor(abs(%s)/%s + 1)", Some(Seq(0,0,1))),
SignedMagnitudeLinear ->
formatCall("case when %s = 1 then floor(%s) else sign(%s) * floor(abs(%s)/%s + 1) end", Some(Seq(1,0,0,0,1))),
BinaryPlus -> infix("+") _,
BinaryMinus -> infix("-") _,
TimesNumNum -> infix("*") _,
@@ -107,6 +109,7 @@ object SqlFunctions {
TextToFixedTimestamp -> formatCall("%s::timestamp with time zone") _,
TextToFloatingTimestamp -> formatCall("%s::timestamp") _, // without time zone
TextToMoney -> formatCall("%s::numeric") _,
TextToBlob -> passthrough,

TextToBool -> formatCall("%s::boolean") _,
BoolToText -> formatCall("%s::varchar") _,
@@ -314,6 +317,9 @@ object SqlFunctions {
Some(Seq(0, 1, 0, 0, 0, 0, 0, 0, 0))) _
}

private def isEmpty =
formatCall("ST_IsEmpty(%s) or %s is null", Some(Seq(0, 0))) _

private def visibleAt =
formatCall(
"""(NOT ST_IsEmpty(%s)) AND (ST_GeometryType(%s) = 'ST_Point' OR ST_GeometryType(%s) = 'ST_MultiPoint' OR
@@ -102,7 +102,7 @@ class RollupManager(pgu: PGSecondaryUniverse[SoQLType, SoQLValue], copyInfo: Cop
// the column back. It would be ideal if we had a better way to communicate this failure upwards through
// the stack.
val prefixedRollupAnalysis: Try[SoQLAnalysis[ColumnName, SoQLAnalysisType]] =
Try { analyzer.analyzeFullQuery(rollupInfo.soql)(prefixedDsContext) }
Try { analyzer.analyzeUnchainedQuery(rollupInfo.soql)(prefixedDsContext) }


prefixedRollupAnalysis match {
@@ -266,8 +266,8 @@ class RollupManager(pgu: PGSecondaryUniverse[SoQLType, SoQLValue], copyInfo: Cop

private def analysisToSoQLType(analysis: ASysCol): AUserCol = {
val baos = new ByteArrayOutputStream
SoQLAnalyzerHelper.serializer(baos, analysis.mapColumnIds(name => new UserColumnId(name.name)))
SoQLAnalyzerHelper.deserializer(new ByteArrayInputStream(baos.toByteArray))
SoQLAnalyzerHelper.serialize(baos, analysis.mapColumnIds(name => new UserColumnId(name.name)))
SoQLAnalyzerHelper.deserialize(new ByteArrayInputStream(baos.toByteArray))
}
}

@@ -83,22 +83,25 @@ class PostgresUniverseCommon(val tablespace: String => Option[String],

val writeLockTimeout = Duration.create(1, TimeUnit.MINUTES)

def idObfuscationContextFor(cryptProvider: CryptProvider): SoQLID.StringRep = new SoQLID.StringRep(cryptProvider)
def idObfuscationContextFor(cryptProvider: CryptProvider, obfuscateId: Boolean): SoQLID.StringRep = {
if (obfuscateId) { new SoQLID.StringRep(cryptProvider) }
else { new SoQLID.ClearNumberRep(cryptProvider) }
}

def versionObfuscationContextFor(cryptProvider: CryptProvider): SoQLVersion.StringRep =
new SoQLVersion.StringRep(cryptProvider)

def jsonReps(datasetInfo: DatasetInfo): SoQLType => JsonColumnRep[SoQLType,SoQLValue] = {
def jsonReps(datasetInfo: DatasetInfo, obfuscateId: Boolean): SoQLType => JsonColumnRep[SoQLType,SoQLValue] = {
val cp = new CryptProvider(datasetInfo.obfuscationKey)
SoQLRep.jsonRep(idObfuscationContextFor(cp), versionObfuscationContextFor(cp))
SoQLRep.jsonRep(idObfuscationContextFor(cp, obfuscateId), versionObfuscationContextFor(cp))
}

def rowPreparer(transactionStart: DateTime, // scalastyle:ignore method.length
ctx: DatasetCopyContext[SoQLType],
replaceUpdatedRows: Boolean): RowPreparer[SoQLValue] =
new RowPreparer[SoQLValue] {
val schema = ctx.schemaByUserColumnId
lazy val jsonRepFor = jsonReps(ctx.datasetInfo)
lazy val jsonRepFor = jsonReps(ctx.datasetInfo, true)

def findCol(name: UserColumnId) =
schema.getOrElse(name, sys.error(s"No $name column?")).systemId
@@ -107,6 +107,8 @@ trait TimestampLikeIndexable[T] extends BaseIndexable[T] { this: SqlColumnCommon

trait BooleanIndexable[T] extends BaseIndexable[T] { this: SqlColumnCommonRep[T] => }

trait BlobIndexable[T] extends BaseIndexable[T] { this: SqlColumnCommonRep[T] => }

trait GeoIndexable[T] extends BaseIndexable[T] { this: SqlColumnCommonRep[T] =>

override def createIndex(tableName: String, tablespace: String): Option[String] = {
@@ -154,6 +156,7 @@ object SoQLIndexableRep {
SoQLDouble -> (base => new DoubleRep(base) with NumberLikeIndexable[SoQLType]),
SoQLObject -> (base => new ObjectRep(base) with NoIndex[SoQLType]), // TODO: Revisit index need
SoQLArray -> (base => new ArrayRep(base) with NoIndex[SoQLType]), // TODO: Revisit index need
SoQLBlob -> (base => new BlobRep(base) with BlobIndexable[SoQLType]), // TODO: Revisit index need
SoQLPoint -> (base =>
new GeometryLikeRep[Point](
SoQLPoint,
@@ -5,13 +5,20 @@ import com.socrata.soql.{SoQLAnalysis, SoQLAnalyzer, AnalysisDeserializer, Analy
import com.socrata.soql.functions.{SoQLFunctions, SoQLFunctionInfo, SoQLTypeInfo}
import com.socrata.soql.environment.{ColumnName, TypeName, DatasetContext}
import com.socrata.soql.types.{SoQLAnalysisType, SoQLType}
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.io.{OutputStream, InputStream, ByteArrayInputStream, ByteArrayOutputStream}
import com.socrata.datacoordinator.id.UserColumnId

object SoQLAnalyzerHelper {
val serializer = new AnalysisSerializer(serializeColumn, serializeAnalysisType)
private val serializer = new AnalysisSerializer(serializeColumn, serializeAnalysisType)

val deserializer = new AnalysisDeserializer(deserializeColumn, deserializeType, SoQLFunctions.functionsByIdentity)
private val deserializer = new AnalysisDeserializer(deserializeColumn,
deserializeType,
SoQLFunctions.functionsByIdentity)

def serialize(outputStream: OutputStream, analysis: SoQLAnalysis[UserColumnId, SoQLAnalysisType]): Unit =
serializer(outputStream, Seq(analysis))

def deserialize(inputStream: InputStream): SoQLAnalysis[UserColumnId, SoQLType] = deserializer(inputStream).head

private val analyzer = new SoQLAnalyzer(SoQLTypeInfo, SoQLFunctionInfo)

@@ -20,10 +27,10 @@ object SoQLAnalyzerHelper {
idMap: ColumnName => UserColumnId): SoQLAnalysis[UserColumnId, SoQLType] = {
implicit val ctx: DatasetContext[SoQLAnalysisType] = toAnalysisType(datasetCtx)

val analysis: SoQLAnalysis[ColumnName, SoQLAnalysisType] = analyzer.analyzeFullQuery(soql)
val analysis: SoQLAnalysis[ColumnName, SoQLAnalysisType] = analyzer.analyzeUnchainedQuery(soql)
val baos = new ByteArrayOutputStream
serializer(baos, analysis.mapColumnIds(idMap))
deserializer(new ByteArrayInputStream(baos.toByteArray))
serialize(baos, analysis.mapColumnIds(idMap))
deserialize(new ByteArrayInputStream(baos.toByteArray))
}

private def serializeColumn(c: UserColumnId) = c.underlying
@@ -97,6 +97,14 @@ class SqlizerTest extends FunSuite with Matchers {
params should be (Seq("POINT(0 0)"))
}

test("is empty") {
val soql = "select is_empty(multipolygon)"
val ParametricSql(sql, setParams) = sqlize(soql, CaseSensitive)
val expected = "SELECT (ST_IsEmpty(multipolygon) or multipolygon is null) FROM t1"
sql.replaceAll("\\s+", " ") should be (expected)
setParams.length should be (0)
}

test("visible at") {
val soql = "select visible_at(multipolygon, 0.03)"
val ParametricSql(sql, setParams) = sqlize(soql, CaseSensitive)
@@ -200,10 +208,10 @@ class SqlizerTest extends FunSuite with Matchers {
test("signed magnitude linear") {
val soql = "select signed_magnitude_linear(year, 42)"
val ParametricSql(sql, setParams) = sqlize(soql, CaseSensitive)
sql should be("SELECT (sign(year) * floor(abs(year)/? + 1)) FROM t1")
setParams.length should be(1)
sql should be("SELECT (case when ? = 1 then floor(year) else sign(year) * floor(abs(year)/? + 1) end) FROM t1")
setParams.length should be(2)
val params = setParams.map { (setParam) => setParam(None, 0).get }
params should be(Seq(42))
params should be(Seq(42,42))
}

test("case fn") {
@@ -212,5 +212,5 @@ trait PGSecondaryUniverseTestBase extends FunSuiteLike with Matchers with Before
/**
* TODO: Remove types in this set once support is added.
*/
val UnsupportedTypes = Set("json", "blob").map(TypeName(_))
val UnsupportedTypes = Set("json").map(TypeName(_))
}
@@ -18,9 +18,9 @@ object Dependencies {
val socrataCuratorUtils = "1.0.1"
val socrataThirdPartyUtils = "4.0.1"
val socrataHttpCuratorBroker = "3.3.0"
val soqlStdlib = "0.8.6"
val soqlStdlib = "1.0.0"
val typesafeConfig = "1.0.0"
val dataCoordinator = "0.7.17"
val dataCoordinator = "0.7.21"
val typesafeScalaLogging = "1.1.0"
val rojomaJson = "3.2.0"
val metricsJetty = "3.1.0"
@@ -44,18 +44,20 @@ object CJSONWriter {
val dateTimeFormat = ISODateTimeFormat.dateTime
val utf8EncodingName = scala.io.Codec.UTF8.name

def writeCJson(datasetInfo: DatasetInfo, // scalastyle:ignore method.length
def writeCJson(// scalastyle:ignore method.length parameter.number
datasetInfo: DatasetInfo,
qrySchema: OrderedMap[ColumnId, ColumnInfo[SoQLType]],
rowData:CloseableIterator[Row[SoQLValue]],
reqRowCount: Boolean,
givenRowCount: Option[Long],
dataVersion: Long,
lastModified: DateTime,
obfuscateId: Boolean,
locale: String = "en_US"): HttpServletResponse => Unit = (r: HttpServletResponse) => {
r.setContentType("application/json")
r.setCharacterEncoding(utf8EncodingName)
val os = r.getOutputStream
val jsonReps = PostgresUniverseCommon.jsonReps(datasetInfo)
val jsonReps = PostgresUniverseCommon.jsonReps(datasetInfo, obfuscateId)

val (rowCount, rows) = givenRowCount match {
case None if reqRowCount =>
@@ -45,6 +45,7 @@ import com.socrata.soql.analyzer.SoQLAnalyzerHelper
import com.socrata.soql.collection.OrderedMap
import com.socrata.soql.environment.ColumnName
import com.socrata.soql.typed.CoreExpr
import com.socrata.soql.types.SoQLID.ClearNumberRep
import com.socrata.soql.types.{SoQLVersion, SoQLID, SoQLValue, SoQLType}
import com.socrata.soql.types.obfuscation.CryptProvider
import com.socrata.curator.{CuratorFromConfig, DiscoveryFromConfig}
@@ -145,13 +146,14 @@ class QueryServer(val dsInfo: DSInfo, val caseSensitivity: CaseSensitivity) exte
val analysisParam = servReq.getParameter("query")
val analysisStream = new ByteArrayInputStream(analysisParam.getBytes(StandardCharsets.ISO_8859_1))
val schemaHash = servReq.getParameter("schemaHash")
val analysis: SoQLAnalysis[UserColumnId, SoQLType] = SoQLAnalyzerHelper.deserializer(analysisStream)
val analysis: SoQLAnalysis[UserColumnId, SoQLType] = SoQLAnalyzerHelper.deserialize(analysisStream)
val reqRowCount = Option(servReq.getParameter("rowCount")).map(_ == "approximate").getOrElse(false)
val copy = Option(servReq.getParameter("copy"))
val rollupName = Option(servReq.getParameter("rollupName")).map(new RollupName(_))
val obfuscateId = !Option(servReq.getParameter("obfuscateId")).exists(_ == "false")

logger.info("Performing query on dataset " + datasetId)
streamQueryResults(analysis, datasetId, reqRowCount, copy, rollupName,
streamQueryResults(analysis, datasetId, reqRowCount, copy, rollupName, obfuscateId,
req.precondition, req.dateTimeHeader("If-Modified-Since"))
}

@@ -160,12 +162,13 @@ class QueryServer(val dsInfo: DSInfo, val caseSensitivity: CaseSensitivity) exte
* passed back to SocrataHttp so the transaction can be maintained through the duration of the
* streaming.
*/
def streamQueryResults(
def streamQueryResults( // scalastyle:ignore parameter.number
analysis: SoQLAnalysis[UserColumnId, SoQLType],
datasetId: String,
reqRowCount: Boolean,
copy: Option[String],
rollupName: Option[RollupName],
obfuscateId: Boolean,
precondition: Precondition,
ifModifiedSince: Option[DateTime]
) (resp:HttpServletResponse): Unit = {
@@ -178,7 +181,7 @@ class QueryServer(val dsInfo: DSInfo, val caseSensitivity: CaseSensitivity) exte
case Some(datasetInfo) =>
def notModified(etags: Seq[EntityTag]) = responses.NotModified ~> ETags(etags)
execQuery(pgu, datasetId, datasetInfo, analysis, reqRowCount, copy,
rollupName, precondition, ifModifiedSince) match {
rollupName, obfuscateId, precondition, ifModifiedSince) match {
case NotModified(etags) => notModified(etags)(resp)
case PreconditionFailed => responses.PreconditionFailed(resp)
case Success(qrySchema, copyNumber, dataVersion, results, etag, lastModified) =>
@@ -190,7 +193,7 @@ class QueryServer(val dsInfo: DSInfo, val caseSensitivity: CaseSensitivity) exte
rollupName.foreach(r => Header("X-SODA2-Rollup", r.underlying)(resp))
for { r <- results } yield {
CJSONWriter.writeCJson(datasetInfo, qrySchema,
r, reqRowCount, r.rowCount, dataVersion, lastModified)(resp)
r, reqRowCount, r.rowCount, dataVersion, lastModified, obfuscateId)(resp)
}
}
}
@@ -206,6 +209,7 @@ class QueryServer(val dsInfo: DSInfo, val caseSensitivity: CaseSensitivity) exte
rowCount: Boolean,
reqCopy: Option[String],
rollupName: Option[RollupName],
obfuscateId: Boolean,
precondition: Precondition,
ifModifiedSince: Option[DateTime]
): QueryResult = {
@@ -216,7 +220,8 @@ class QueryServer(val dsInfo: DSInfo, val caseSensitivity: CaseSensitivity) exte
rowCount: Boolean) = {
val cryptProvider = new CryptProvider(latestCopy.datasetInfo.obfuscationKey)
val sqlCtx = Map[SqlizerContext, Any](
SqlizerContext.IdRep -> new SoQLID.StringRep(cryptProvider),
SqlizerContext.IdRep -> (if (obfuscateId) { new SoQLID.StringRep(cryptProvider) }
else { new ClearNumberRep(cryptProvider) }),
SqlizerContext.VerRep -> new SoQLVersion.StringRep(cryptProvider),
SqlizerContext.CaseSensitivity -> caseSensitivity
)
@@ -1,4 +1,5 @@
[{ "category" : "Advanced", "count" : 1}
[{ "category" : null, "count" : 1}
,{ "category" : "Advanced", "count" : 1}
,{ "category" : "Beginner", "count" : 3 }
,{ "category" : "Intermediate", "count" : 10 }
]
@@ -3,5 +3,5 @@
,{"count":2,"date_trunc_y_available":"2011-01-01T00:00:00.000"}
,{"count":3,"date_trunc_y_available":"2012-01-01T00:00:00.000"}
,{"count":2,"date_trunc_y_available":"2013-01-01T00:00:00.000"}
,{"count":5,"date_trunc_y_available":null}
,{"count":6,"date_trunc_y_available":null}
]
@@ -12,7 +12,7 @@
"brand_made": "ISRAEL'S APCO"
},
{
"count_name": 5,
"count_name": 6,
"brand_made": null
}
]
]
@@ -8,23 +8,27 @@
"count_name": 3
},
{
"umake": "OZONE",
"umake": "GIN",
"count_name": 2
},
{
"umake": "GIN",
"umake": "OZONE",
"count_name": 2
},
{
"umake": "FOXTROT",
"umake": "ATHING",
"count_name": 1
},
{
"umake": "YANKEE",
"umake": "FOXTROT",
"count_name": 1
},
{
"umake": "HOTEL",
"count_name": 1
},
{
"umake": "YANKEE",
"count_name": 1
}
]
]